diff options
Diffstat (limited to 'synapse')
143 files changed, 1811 insertions, 1145 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index c371e8f3c4..f5cd8271a6 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -36,7 +36,7 @@ try: except ImportError: pass -__version__ = "1.15.2" +__version__ = "1.16.0rc1" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when diff --git a/synapse/_scripts/register_new_matrix_user.py b/synapse/_scripts/register_new_matrix_user.py index d528450c78..55cce2db22 100644 --- a/synapse/_scripts/register_new_matrix_user.py +++ b/synapse/_scripts/register_new_matrix_user.py @@ -23,8 +23,6 @@ import hmac import logging import sys -from six.moves import input - import requests as _requests import yaml diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 06ade25674..06ba6604f3 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -16,8 +16,6 @@ import logging from typing import Optional -from six import itervalues - import pymacaroons from netaddr import IPAddress @@ -90,7 +88,7 @@ class Auth(object): event, prev_state_ids, for_verification=True ) auth_events = yield self.store.get_events(auth_events_ids) - auth_events = {(e.type, e.state_key): e for e in itervalues(auth_events)} + auth_events = {(e.type, e.state_key): e for e in auth_events.values()} room_version_obj = KNOWN_ROOM_VERSIONS[room_version] event_auth.check( diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 5ec4a77ccd..6a6d32c302 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -150,3 +150,8 @@ class EventContentFields(object): # Timestamp to delete the event after # cf https://github.com/matrix-org/matrix-doc/pull/2228 SELF_DESTRUCT_AFTER = "org.matrix.self_destruct_after" + + +class RoomEncryptionAlgorithms(object): + MEGOLM_V1_AES_SHA2 = "m.megolm.v1.aes-sha2" + DEFAULT = MEGOLM_V1_AES_SHA2 diff --git a/synapse/api/errors.py b/synapse/api/errors.py index d54dfb385d..5305038c21 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -17,11 +17,9 @@ """Contains exceptions and error codes.""" import logging +from http import HTTPStatus from typing import Dict, List -from six import iteritems -from six.moves import http_client - from canonicaljson import json from twisted.web import http @@ -174,7 +172,7 @@ class ConsentNotGivenError(SynapseError): consent_url (str): The URL where the user can give their consent """ super(ConsentNotGivenError, self).__init__( - code=http_client.FORBIDDEN, msg=msg, errcode=Codes.CONSENT_NOT_GIVEN + code=HTTPStatus.FORBIDDEN, msg=msg, errcode=Codes.CONSENT_NOT_GIVEN ) self._consent_uri = consent_uri @@ -194,7 +192,7 @@ class UserDeactivatedError(SynapseError): msg (str): The human-readable error message """ super(UserDeactivatedError, self).__init__( - code=http_client.FORBIDDEN, msg=msg, errcode=Codes.USER_DEACTIVATED + code=HTTPStatus.FORBIDDEN, msg=msg, errcode=Codes.USER_DEACTIVATED ) @@ -497,7 +495,7 @@ def cs_error(msg, code=Codes.UNKNOWN, **kwargs): A dict representing the error response JSON. """ err = {"error": msg, "errcode": code} - for key, value in iteritems(kwargs): + for key, value in kwargs.items(): err[key] = value return err diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 8b64d0a285..f988f62a1e 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -17,8 +17,6 @@ # limitations under the License. from typing import List -from six import text_type - import jsonschema from canonicaljson import json from jsonschema import FormatChecker @@ -313,7 +311,7 @@ class Filter(object): content = event.get("content", {}) # check if there is a string url field in the content for filtering purposes - contains_url = isinstance(content.get("url"), text_type) + contains_url = isinstance(content.get("url"), str) labels = content.get(EventContentFields.LABELS, []) return self.check_fields(room_id, sender, ev_type, labels, contains_url) diff --git a/synapse/api/urls.py b/synapse/api/urls.py index f34434bd67..bd03ebca5a 100644 --- a/synapse/api/urls.py +++ b/synapse/api/urls.py @@ -17,8 +17,7 @@ """Contains the URL paths to prefix various aspects of the server with. """ import hmac from hashlib import sha256 - -from six.moves.urllib.parse import urlencode +from urllib.parse import urlencode from synapse.config import ConfigError diff --git a/synapse/app/_base.py b/synapse/app/_base.py index dedff81af3..373a80a4a7 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -20,6 +20,7 @@ import signal import socket import sys import traceback +from typing import Iterable from daemonize import Daemonize from typing_extensions import NoReturn @@ -29,6 +30,7 @@ from twisted.protocols.tls import TLSMemoryBIOFactory import synapse from synapse.app import check_bind_error +from synapse.config.server import ListenerConfig from synapse.crypto import context_factory from synapse.logging.context import PreserveLoggingContext from synapse.util.async_helpers import Linearizer @@ -234,7 +236,7 @@ def refresh_certificate(hs): logger.info("Context factories updated.") -def start(hs, listeners=None): +def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]): """ Start a Synapse server or worker. @@ -245,8 +247,8 @@ def start(hs, listeners=None): notify systemd. Args: - hs (synapse.server.HomeServer) - listeners (list[dict]): Listener configuration ('listeners' in homeserver.yaml) + hs: homeserver instance + listeners: Listener configuration ('listeners' in homeserver.yaml) """ try: # Set up the SIGHUP machinery. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index f3ec2a34ec..27a3fc9ed6 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -37,6 +37,7 @@ from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging +from synapse.config.server import ListenerConfig from synapse.federation import send_queue from synapse.federation.transport.server import TransportLayerServer from synapse.handlers.presence import ( @@ -514,13 +515,18 @@ class GenericWorkerSlavedStore( class GenericWorkerServer(HomeServer): DATASTORE_CLASS = GenericWorkerSlavedStore - def _listen_http(self, listener_config): - port = listener_config["port"] - bind_addresses = listener_config["bind_addresses"] - site_tag = listener_config.get("tag", port) + def _listen_http(self, listener_config: ListenerConfig): + port = listener_config.port + bind_addresses = listener_config.bind_addresses + + assert listener_config.http_options is not None + + site_tag = listener_config.http_options.tag + if site_tag is None: + site_tag = port resources = {} - for res in listener_config["resources"]: - for name in res["names"]: + for res in listener_config.http_options.resources: + for name in res.names: if name == "metrics": resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "client": @@ -590,7 +596,7 @@ class GenericWorkerServer(HomeServer): " repository is disabled. Ignoring." ) - if name == "openid" and "federation" not in res["names"]: + if name == "openid" and "federation" not in res.names: # Only load the openid resource separately if federation resource # is not specified since federation resource includes openid # resource. @@ -625,19 +631,19 @@ class GenericWorkerServer(HomeServer): logger.info("Synapse worker now listening on port %d", port) - def start_listening(self, listeners): + def start_listening(self, listeners: Iterable[ListenerConfig]): for listener in listeners: - if listener["type"] == "http": + if listener.type == "http": self._listen_http(listener) - elif listener["type"] == "manhole": + elif listener.type == "manhole": _base.listen_tcp( - listener["bind_addresses"], - listener["port"], + listener.bind_addresses, + listener.port, manhole( username="matrix", password="rabbithole", globals={"hs": self} ), ) - elif listener["type"] == "metrics": + elif listener.type == "metrics": if not self.get_config().enable_metrics: logger.warning( ( @@ -646,9 +652,9 @@ class GenericWorkerServer(HomeServer): ) ) else: - _base.listen_metrics(listener["bind_addresses"], listener["port"]) + _base.listen_metrics(listener.bind_addresses, listener.port) else: - logger.warning("Unrecognized listener type: %s", listener["type"]) + logger.warning("Unsupported listener type: %s", listener.type) self.get_tcp_replication().start_replication(self) @@ -738,6 +744,11 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler): except Exception: logger.exception("Error processing replication") + async def on_position(self, stream_name: str, instance_name: str, token: int): + await super().on_position(stream_name, instance_name, token) + # Also call on_rdata to ensure that stream positions are properly reset. + await self.on_rdata(stream_name, instance_name, token, []) + def stop_pusher(self, user_id, app_id, pushkey): if not self.notify_pushers: return diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 41994dc14b..09291d86ad 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -23,8 +23,7 @@ import math import os import resource import sys - -from six import iteritems +from typing import Iterable from prometheus_client import Gauge @@ -50,6 +49,7 @@ from synapse.app import _base from synapse.app._base import listen_ssl, listen_tcp, quit_with_error from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig +from synapse.config.server import ListenerConfig from synapse.federation.transport.server import TransportLayerServer from synapse.http.additional_resource import AdditionalResource from synapse.http.server import ( @@ -90,24 +90,24 @@ def gz_wrap(r): class SynapseHomeServer(HomeServer): DATASTORE_CLASS = DataStore - def _listener_http(self, config, listener_config): - port = listener_config["port"] - bind_addresses = listener_config["bind_addresses"] - tls = listener_config.get("tls", False) - site_tag = listener_config.get("tag", port) + def _listener_http(self, config: HomeServerConfig, listener_config: ListenerConfig): + port = listener_config.port + bind_addresses = listener_config.bind_addresses + tls = listener_config.tls + site_tag = listener_config.http_options.tag + if site_tag is None: + site_tag = port resources = {} - for res in listener_config["resources"]: - for name in res["names"]: - if name == "openid" and "federation" in res["names"]: + for res in listener_config.http_options.resources: + for name in res.names: + if name == "openid" and "federation" in res.names: # Skip loading openid resource if federation is defined # since federation resource will include openid continue - resources.update( - self._configure_named_resource(name, res.get("compress", False)) - ) + resources.update(self._configure_named_resource(name, res.compress)) - additional_resources = listener_config.get("additional_resources", {}) + additional_resources = listener_config.http_options.additional_resources logger.debug("Configuring additional resources: %r", additional_resources) module_api = ModuleApi(self, self.get_auth_handler()) for path, resmodule in additional_resources.items(): @@ -279,7 +279,7 @@ class SynapseHomeServer(HomeServer): return resources - def start_listening(self, listeners): + def start_listening(self, listeners: Iterable[ListenerConfig]): config = self.get_config() if config.redis_enabled: @@ -289,25 +289,25 @@ class SynapseHomeServer(HomeServer): self.get_tcp_replication().start_replication(self) for listener in listeners: - if listener["type"] == "http": + if listener.type == "http": self._listening_services.extend(self._listener_http(config, listener)) - elif listener["type"] == "manhole": + elif listener.type == "manhole": listen_tcp( - listener["bind_addresses"], - listener["port"], + listener.bind_addresses, + listener.port, manhole( username="matrix", password="rabbithole", globals={"hs": self} ), ) - elif listener["type"] == "replication": + elif listener.type == "replication": services = listen_tcp( - listener["bind_addresses"], - listener["port"], + listener.bind_addresses, + listener.port, ReplicationStreamProtocolFactory(self), ) for s in services: reactor.addSystemEventTrigger("before", "shutdown", s.stopListening) - elif listener["type"] == "metrics": + elif listener.type == "metrics": if not self.get_config().enable_metrics: logger.warning( ( @@ -316,9 +316,11 @@ class SynapseHomeServer(HomeServer): ) ) else: - _base.listen_metrics(listener["bind_addresses"], listener["port"]) + _base.listen_metrics(listener.bind_addresses, listener.port) else: - logger.warning("Unrecognized listener type: %s", listener["type"]) + # this shouldn't happen, as the listener type should have been checked + # during parsing + logger.warning("Unrecognized listener type: %s", listener.type) # Gauges to expose monthly active user control metrics @@ -526,7 +528,7 @@ def phone_stats_home(hs, stats, stats_process=_stats_process): stats["total_nonbridged_users"] = total_nonbridged_users daily_user_type_results = yield hs.get_datastore().count_daily_user_type() - for name, count in iteritems(daily_user_type_results): + for name, count in daily_user_type_results.items(): stats["daily_user_type_" + name] = count room_count = yield hs.get_datastore().get_room_count() @@ -538,7 +540,7 @@ def phone_stats_home(hs, stats, stats_process=_stats_process): stats["daily_messages"] = yield hs.get_datastore().count_daily_messages() r30_results = yield hs.get_datastore().count_r30_users() - for name, count in iteritems(r30_results): + for name, count in r30_results.items(): stats["r30_users_" + name] = count daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages() diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 1b13e84425..0323256472 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -15,8 +15,6 @@ import logging import re -from six import string_types - from twisted.internet import defer from synapse.api.constants import EventTypes @@ -156,7 +154,7 @@ class ApplicationService(object): ) regex = regex_obj.get("regex") - if isinstance(regex, string_types): + if isinstance(regex, str): regex_obj["regex"] = re.compile(regex) # Pre-compile regex else: raise ValueError("Expected string for 'regex' in ns '%s'" % ns) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 57174da021..da9a5e86d4 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -13,8 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging - -from six.moves import urllib +import urllib from prometheus_client import Counter diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 30d1050a91..1391e5fc43 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -22,8 +22,6 @@ from collections import OrderedDict from textwrap import dedent from typing import Any, MutableMapping, Optional -from six import integer_types - import yaml @@ -117,7 +115,7 @@ class Config(object): @staticmethod def parse_size(value): - if isinstance(value, integer_types): + if isinstance(value, int): return value sizes = {"K": 1024, "M": 1024 * 1024} size = 1 @@ -129,7 +127,7 @@ class Config(object): @staticmethod def parse_duration(value): - if isinstance(value, integer_types): + if isinstance(value, int): return value second = 1000 minute = 60 * second diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index ca43e96bd1..8ed3e24258 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -14,9 +14,7 @@ import logging from typing import Dict - -from six import string_types -from six.moves.urllib import parse as urlparse +from urllib import parse as urlparse import yaml from netaddr import IPSet @@ -98,17 +96,14 @@ def load_appservices(hostname, config_files): def _load_appservice(hostname, as_info, config_filename): required_string_fields = ["id", "as_token", "hs_token", "sender_localpart"] for field in required_string_fields: - if not isinstance(as_info.get(field), string_types): + if not isinstance(as_info.get(field), str): raise KeyError( "Required string field: '%s' (%s)" % (field, config_filename) ) # 'url' must either be a string or explicitly null, not missing # to avoid accidentally turning off push for ASes. - if ( - not isinstance(as_info.get("url"), string_types) - and as_info.get("url", "") is not None - ): + if not isinstance(as_info.get("url"), str) and as_info.get("url", "") is not None: raise KeyError( "Required string field or explicit null: 'url' (%s)" % (config_filename,) ) @@ -138,7 +133,7 @@ def _load_appservice(hostname, as_info, config_filename): ns, regex_obj, ) - if not isinstance(regex_obj.get("regex"), string_types): + if not isinstance(regex_obj.get("regex"), str): raise ValueError("Missing/bad type 'regex' key in %s", regex_obj) if not isinstance(regex_obj.get("exclusive"), bool): raise ValueError( diff --git a/synapse/config/cache.py b/synapse/config/cache.py index 0672538796..aff5b21ab2 100644 --- a/synapse/config/cache.py +++ b/synapse/config/cache.py @@ -15,6 +15,7 @@ import os import re +import threading from typing import Callable, Dict from ._base import Config, ConfigError @@ -25,6 +26,9 @@ _CACHE_PREFIX = "SYNAPSE_CACHE_FACTOR" # Map from canonicalised cache name to cache. _CACHES = {} +# a lock on the contents of _CACHES +_CACHES_LOCK = threading.Lock() + _DEFAULT_FACTOR_SIZE = 0.5 _DEFAULT_EVENT_CACHE_SIZE = "10K" @@ -66,7 +70,10 @@ def add_resizable_cache(cache_name: str, cache_resize_callback: Callable): # Some caches have '*' in them which we strip out. cache_name = _canonicalise_cache_name(cache_name) - _CACHES[cache_name] = cache_resize_callback + # sometimes caches are initialised from background threads, so we need to make + # sure we don't conflict with another thread running a resize operation + with _CACHES_LOCK: + _CACHES[cache_name] = cache_resize_callback # Ensure all loaded caches are sized appropriately # @@ -87,7 +94,8 @@ class CacheConfig(Config): os.environ.get(_CACHE_PREFIX, _DEFAULT_FACTOR_SIZE) ) properties.resize_all_caches_func = None - _CACHES.clear() + with _CACHES_LOCK: + _CACHES.clear() def generate_config_section(self, **kwargs): return """\ @@ -193,6 +201,8 @@ class CacheConfig(Config): For each cache, run the mapped callback function with either a specific cache factor or the default, global one. """ - for cache_name, callback in _CACHES.items(): - new_factor = self.cache_factors.get(cache_name, self.global_factor) - callback(new_factor) + # block other threads from modifying _CACHES while we iterate it. + with _CACHES_LOCK: + for cache_name, callback in _CACHES.items(): + new_factor = self.cache_factors.get(cache_name, self.global_factor) + callback(new_factor) diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 2c7b3a699f..264c274c52 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -36,6 +36,7 @@ from .ratelimiting import RatelimitConfig from .redis import RedisConfig from .registration import RegistrationConfig from .repository import ContentRepositoryConfig +from .room import RoomConfig from .room_directory import RoomDirectoryConfig from .saml2_config import SAML2Config from .server import ServerConfig @@ -79,6 +80,7 @@ class HomeServerConfig(RootConfig): PasswordAuthProviderConfig, PushConfig, SpamCheckerConfig, + RoomConfig, GroupsConfig, UserDirectoryConfig, ConsentConfig, diff --git a/synapse/config/oidc_config.py b/synapse/config/oidc_config.py index e24dd637bc..e0939bce84 100644 --- a/synapse/config/oidc_config.py +++ b/synapse/config/oidc_config.py @@ -89,7 +89,7 @@ class OIDCConfig(Config): # use an OpenID Connect Provider for authentication, instead of its internal # password database. # - # See https://github.com/matrix-org/synapse/blob/master/openid.md. + # See https://github.com/matrix-org/synapse/blob/master/docs/openid.md. # oidc_config: # Uncomment the following to enable authorization against an OpenID Connect diff --git a/synapse/config/registration.py b/synapse/config/registration.py index fecced2d57..6badf4e75d 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -18,8 +18,9 @@ from distutils.util import strtobool import pkg_resources +from synapse.api.constants import RoomCreationPreset from synapse.config._base import Config, ConfigError -from synapse.types import RoomAlias +from synapse.types import RoomAlias, UserID from synapse.util.stringutils import random_string_with_symbols @@ -127,7 +128,50 @@ class RegistrationConfig(Config): for room_alias in self.auto_join_rooms: if not RoomAlias.is_valid(room_alias): raise ConfigError("Invalid auto_join_rooms entry %s" % (room_alias,)) + + # Options for creating auto-join rooms if they do not exist yet. self.autocreate_auto_join_rooms = config.get("autocreate_auto_join_rooms", True) + self.autocreate_auto_join_rooms_federated = config.get( + "autocreate_auto_join_rooms_federated", True + ) + self.autocreate_auto_join_room_preset = ( + config.get("autocreate_auto_join_room_preset") + or RoomCreationPreset.PUBLIC_CHAT + ) + self.auto_join_room_requires_invite = self.autocreate_auto_join_room_preset in { + RoomCreationPreset.PRIVATE_CHAT, + RoomCreationPreset.TRUSTED_PRIVATE_CHAT, + } + + # Pull the creater/inviter from the configuration, this gets used to + # send invites for invite-only rooms. + mxid_localpart = config.get("auto_join_mxid_localpart") + self.auto_join_user_id = None + if mxid_localpart: + # Convert the localpart to a full mxid. + self.auto_join_user_id = UserID( + mxid_localpart, self.server_name + ).to_string() + + if self.autocreate_auto_join_rooms: + # Ensure the preset is a known value. + if self.autocreate_auto_join_room_preset not in { + RoomCreationPreset.PUBLIC_CHAT, + RoomCreationPreset.PRIVATE_CHAT, + RoomCreationPreset.TRUSTED_PRIVATE_CHAT, + }: + raise ConfigError("Invalid value for autocreate_auto_join_room_preset") + # If the preset requires invitations to be sent, ensure there's a + # configured user to send them from. + if self.auto_join_room_requires_invite: + if not mxid_localpart: + raise ConfigError( + "The configuration option `auto_join_mxid_localpart` is required if " + "`autocreate_auto_join_room_preset` is set to private_chat or trusted_private_chat, such that " + "Synapse knows who to send invitations from. Please " + "configure `auto_join_mxid_localpart`." + ) + self.auto_join_rooms_for_guests = config.get("auto_join_rooms_for_guests", True) self.enable_set_displayname = config.get("enable_set_displayname", True) @@ -357,7 +401,11 @@ class RegistrationConfig(Config): #enable_3pid_changes: false # Users who register on this homeserver will automatically be joined - # to these rooms + # to these rooms. + # + # By default, any room aliases included in this list will be created + # as a publicly joinable room when the first user registers for the + # homeserver. This behaviour can be customised with the settings below. # #auto_join_rooms: # - "#example:example.com" @@ -365,10 +413,62 @@ class RegistrationConfig(Config): # Where auto_join_rooms are specified, setting this flag ensures that the # the rooms exist by creating them when the first user on the # homeserver registers. + # + # By default the auto-created rooms are publicly joinable from any federated + # server. Use the autocreate_auto_join_rooms_federated and + # autocreate_auto_join_room_preset settings below to customise this behaviour. + # # Setting to false means that if the rooms are not manually created, # users cannot be auto-joined since they do not exist. # - #autocreate_auto_join_rooms: true + # Defaults to true. Uncomment the following line to disable automatically + # creating auto-join rooms. + # + #autocreate_auto_join_rooms: false + + # Whether the auto_join_rooms that are auto-created are available via + # federation. Only has an effect if autocreate_auto_join_rooms is true. + # + # Note that whether a room is federated cannot be modified after + # creation. + # + # Defaults to true: the room will be joinable from other servers. + # Uncomment the following to prevent users from other homeservers from + # joining these rooms. + # + #autocreate_auto_join_rooms_federated: false + + # The room preset to use when auto-creating one of auto_join_rooms. Only has an + # effect if autocreate_auto_join_rooms is true. + # + # This can be one of "public_chat", "private_chat", or "trusted_private_chat". + # If a value of "private_chat" or "trusted_private_chat" is used then + # auto_join_mxid_localpart must also be configured. + # + # Defaults to "public_chat", meaning that the room is joinable by anyone, including + # federated servers if autocreate_auto_join_rooms_federated is true (the default). + # Uncomment the following to require an invitation to join these rooms. + # + #autocreate_auto_join_room_preset: private_chat + + # The local part of the user id which is used to create auto_join_rooms if + # autocreate_auto_join_rooms is true. If this is not provided then the + # initial user account that registers will be used to create the rooms. + # + # The user id is also used to invite new users to any auto-join rooms which + # are set to invite-only. + # + # It *must* be configured if autocreate_auto_join_room_preset is set to + # "private_chat" or "trusted_private_chat". + # + # Note that this must be specified in order for new users to be correctly + # invited to any auto-join rooms which have been set to invite-only (either + # at the time of creation or subsequently). + # + # Note that, if the room already exists, this user must be joined and + # have the appropriate permissions to invite new members. + # + #auto_join_mxid_localpart: system # When auto_join_rooms is specified, setting this flag to false prevents # guest accounts from being automatically joined to the rooms. diff --git a/synapse/config/repository.py b/synapse/config/repository.py index b751d02d37..01009f3924 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -94,6 +94,12 @@ class ContentRepositoryConfig(Config): else: self.can_load_media_repo = True + # Whether this instance should be the one to run the background jobs to + # e.g clean up old URL previews. + self.media_instance_running_background_jobs = config.get( + "media_instance_running_background_jobs", + ) + self.max_upload_size = self.parse_size(config.get("max_upload_size", "10M")) self.max_image_pixels = self.parse_size(config.get("max_image_pixels", "32M")) self.max_spider_size = self.parse_size(config.get("max_spider_size", "10M")) diff --git a/synapse/config/room.py b/synapse/config/room.py new file mode 100644 index 0000000000..6aa4de0672 --- /dev/null +++ b/synapse/config/room.py @@ -0,0 +1,80 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from synapse.api.constants import RoomCreationPreset + +from ._base import Config, ConfigError + +logger = logging.Logger(__name__) + + +class RoomDefaultEncryptionTypes(object): + """Possible values for the encryption_enabled_by_default_for_room_type config option""" + + ALL = "all" + INVITE = "invite" + OFF = "off" + + +class RoomConfig(Config): + section = "room" + + def read_config(self, config, **kwargs): + # Whether new, locally-created rooms should have encryption enabled + encryption_for_room_type = config.get( + "encryption_enabled_by_default_for_room_type", + RoomDefaultEncryptionTypes.OFF, + ) + if encryption_for_room_type == RoomDefaultEncryptionTypes.ALL: + self.encryption_enabled_by_default_for_room_presets = [ + RoomCreationPreset.PRIVATE_CHAT, + RoomCreationPreset.TRUSTED_PRIVATE_CHAT, + RoomCreationPreset.PUBLIC_CHAT, + ] + elif encryption_for_room_type == RoomDefaultEncryptionTypes.INVITE: + self.encryption_enabled_by_default_for_room_presets = [ + RoomCreationPreset.PRIVATE_CHAT, + RoomCreationPreset.TRUSTED_PRIVATE_CHAT, + ] + elif encryption_for_room_type == RoomDefaultEncryptionTypes.OFF: + self.encryption_enabled_by_default_for_room_presets = [] + else: + raise ConfigError( + "Invalid value for encryption_enabled_by_default_for_room_type" + ) + + def generate_config_section(self, **kwargs): + return """\ + ## Rooms ## + + # Controls whether locally-created rooms should be end-to-end encrypted by + # default. + # + # Possible options are "all", "invite", and "off". They are defined as: + # + # * "all": any locally-created room + # * "invite": any room created with the "private_chat" or "trusted_private_chat" + # room creation presets + # * "off": this option will take no effect + # + # The default value is "off". + # + # Note that this option will only affect rooms created after it is set. It + # will also not affect rooms created by other servers. + # + #encryption_enabled_by_default_for_room_type: invite + """ diff --git a/synapse/config/saml2_config.py b/synapse/config/saml2_config.py index d0a19751e8..293643b2de 100644 --- a/synapse/config/saml2_config.py +++ b/synapse/config/saml2_config.py @@ -160,7 +160,7 @@ class SAML2Config(Config): # session lifetime: in milliseconds self.saml2_session_lifetime = self.parse_duration( - saml2_config.get("saml_session_lifetime", "5m") + saml2_config.get("saml_session_lifetime", "15m") ) template_dir = saml2_config.get("template_dir") @@ -286,7 +286,7 @@ class SAML2Config(Config): # The lifetime of a SAML session. This defines how long a user has to # complete the authentication process, if allow_unsolicited is unset. - # The default is 5 minutes. + # The default is 15 minutes. # #saml_session_lifetime: 5m diff --git a/synapse/config/server.py b/synapse/config/server.py index f57eefc99c..8204664883 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -19,7 +19,7 @@ import logging import os.path import re from textwrap import indent -from typing import Dict, List, Optional +from typing import Any, Dict, Iterable, List, Optional import attr import yaml @@ -57,6 +57,64 @@ on how to configure the new listener. --------------------------------------------------------------------------------""" +KNOWN_LISTENER_TYPES = { + "http", + "metrics", + "manhole", + "replication", +} + +KNOWN_RESOURCES = { + "client", + "consent", + "federation", + "keys", + "media", + "metrics", + "openid", + "replication", + "static", + "webclient", +} + + +@attr.s(frozen=True) +class HttpResourceConfig: + names = attr.ib( + type=List[str], + factory=list, + validator=attr.validators.deep_iterable(attr.validators.in_(KNOWN_RESOURCES)), # type: ignore + ) + compress = attr.ib( + type=bool, + default=False, + validator=attr.validators.optional(attr.validators.instance_of(bool)), # type: ignore[arg-type] + ) + + +@attr.s(frozen=True) +class HttpListenerConfig: + """Object describing the http-specific parts of the config of a listener""" + + x_forwarded = attr.ib(type=bool, default=False) + resources = attr.ib(type=List[HttpResourceConfig], factory=list) + additional_resources = attr.ib(type=Dict[str, dict], factory=dict) + tag = attr.ib(type=str, default=None) + + +@attr.s(frozen=True) +class ListenerConfig: + """Object describing the configuration of a single listener.""" + + port = attr.ib(type=int, validator=attr.validators.instance_of(int)) + bind_addresses = attr.ib(type=List[str]) + type = attr.ib(type=str, validator=attr.validators.in_(KNOWN_LISTENER_TYPES)) + tls = attr.ib(type=bool, default=False) + + # http_options is only populated if type=http + http_options = attr.ib(type=Optional[HttpListenerConfig], default=None) + + class ServerConfig(Config): section = "server" @@ -379,38 +437,21 @@ class ServerConfig(Config): } ] - self.listeners = [] # type: List[dict] - for listener in config.get("listeners", []): - if not isinstance(listener.get("port", None), int): - raise ConfigError( - "Listener configuration is lacking a valid 'port' option" - ) + self.listeners = [parse_listener_def(x) for x in config.get("listeners", [])] - if listener.setdefault("tls", False): - # no_tls is not really supported any more, but let's grandfather it in - # here. - if config.get("no_tls", False): + # no_tls is not really supported any more, but let's grandfather it in + # here. + if config.get("no_tls", False): + l2 = [] + for listener in self.listeners: + if listener.tls: logger.info( - "Ignoring TLS-enabled listener on port %i due to no_tls" + "Ignoring TLS-enabled listener on port %i due to no_tls", + listener.port, ) - continue - - bind_address = listener.pop("bind_address", None) - bind_addresses = listener.setdefault("bind_addresses", []) - - # if bind_address was specified, add it to the list of addresses - if bind_address: - bind_addresses.append(bind_address) - - # if we still have an empty list of addresses, use the default list - if not bind_addresses: - if listener["type"] == "metrics": - # the metrics listener doesn't support IPv6 - bind_addresses.append("0.0.0.0") else: - bind_addresses.extend(DEFAULT_BIND_ADDRESSES) - - self.listeners.append(listener) + l2.append(listener) + self.listeners = l2 if not self.web_client_location: _warn_if_webclient_configured(self.listeners) @@ -446,43 +487,41 @@ class ServerConfig(Config): bind_host = config.get("bind_host", "") gzip_responses = config.get("gzip_responses", True) + http_options = HttpListenerConfig( + resources=[ + HttpResourceConfig(names=["client"], compress=gzip_responses), + HttpResourceConfig(names=["federation"]), + ], + ) + self.listeners.append( - { - "port": bind_port, - "bind_addresses": [bind_host], - "tls": True, - "type": "http", - "resources": [ - {"names": ["client"], "compress": gzip_responses}, - {"names": ["federation"], "compress": False}, - ], - } + ListenerConfig( + port=bind_port, + bind_addresses=[bind_host], + tls=True, + type="http", + http_options=http_options, + ) ) unsecure_port = config.get("unsecure_port", bind_port - 400) if unsecure_port: self.listeners.append( - { - "port": unsecure_port, - "bind_addresses": [bind_host], - "tls": False, - "type": "http", - "resources": [ - {"names": ["client"], "compress": gzip_responses}, - {"names": ["federation"], "compress": False}, - ], - } + ListenerConfig( + port=unsecure_port, + bind_addresses=[bind_host], + tls=False, + type="http", + http_options=http_options, + ) ) manhole = config.get("manhole") if manhole: self.listeners.append( - { - "port": manhole, - "bind_addresses": ["127.0.0.1"], - "type": "manhole", - "tls": False, - } + ListenerConfig( + port=manhole, bind_addresses=["127.0.0.1"], type="manhole", + ) ) metrics_port = config.get("metrics_port") @@ -490,13 +529,14 @@ class ServerConfig(Config): logger.warning(METRICS_PORT_WARNING) self.listeners.append( - { - "port": metrics_port, - "bind_addresses": [config.get("metrics_bind_host", "127.0.0.1")], - "tls": False, - "type": "http", - "resources": [{"names": ["metrics"], "compress": False}], - } + ListenerConfig( + port=metrics_port, + bind_addresses=[config.get("metrics_bind_host", "127.0.0.1")], + type="http", + http_options=HttpListenerConfig( + resources=[HttpResourceConfig(names=["metrics"])] + ), + ) ) _check_resource_config(self.listeners) @@ -522,7 +562,7 @@ class ServerConfig(Config): ) def has_tls_listener(self) -> bool: - return any(listener["tls"] for listener in self.listeners) + return any(listener.tls for listener in self.listeners) def generate_config_section( self, server_name, data_dir_path, open_private_ports, listeners, **kwargs @@ -856,7 +896,7 @@ class ServerConfig(Config): # number of monthly active users. # # 'limit_usage_by_mau' disables/enables monthly active user blocking. When - # anabled and a limit is reached the server returns a 'ResourceLimitError' + # enabled and a limit is reached the server returns a 'ResourceLimitError' # with error type Codes.RESOURCE_LIMIT_EXCEEDED # # 'max_mau_value' is the hard limit of monthly active users above which @@ -1081,6 +1121,44 @@ def read_gc_thresholds(thresholds): ) +def parse_listener_def(listener: Any) -> ListenerConfig: + """parse a listener config from the config file""" + listener_type = listener["type"] + + port = listener.get("port") + if not isinstance(port, int): + raise ConfigError("Listener configuration is lacking a valid 'port' option") + + tls = listener.get("tls", False) + + bind_addresses = listener.get("bind_addresses", []) + bind_address = listener.get("bind_address") + # if bind_address was specified, add it to the list of addresses + if bind_address: + bind_addresses.append(bind_address) + + # if we still have an empty list of addresses, use the default list + if not bind_addresses: + if listener_type == "metrics": + # the metrics listener doesn't support IPv6 + bind_addresses.append("0.0.0.0") + else: + bind_addresses.extend(DEFAULT_BIND_ADDRESSES) + + http_config = None + if listener_type == "http": + http_config = HttpListenerConfig( + x_forwarded=listener.get("x_forwarded", False), + resources=[ + HttpResourceConfig(**res) for res in listener.get("resources", []) + ], + additional_resources=listener.get("additional_resources", {}), + tag=listener.get("tag"), + ) + + return ListenerConfig(port, bind_addresses, listener_type, tls, http_config) + + NO_MORE_WEB_CLIENT_WARNING = """ Synapse no longer includes a web client. To enable a web client, configure web_client_location. To remove this warning, remove 'webclient' from the 'listeners' @@ -1088,40 +1166,27 @@ configuration. """ -def _warn_if_webclient_configured(listeners): +def _warn_if_webclient_configured(listeners: Iterable[ListenerConfig]) -> None: for listener in listeners: - for res in listener.get("resources", []): - for name in res.get("names", []): + if not listener.http_options: + continue + for res in listener.http_options.resources: + for name in res.names: if name == "webclient": logger.warning(NO_MORE_WEB_CLIENT_WARNING) return -KNOWN_RESOURCES = ( - "client", - "consent", - "federation", - "keys", - "media", - "metrics", - "openid", - "replication", - "static", - "webclient", -) - - -def _check_resource_config(listeners): +def _check_resource_config(listeners: Iterable[ListenerConfig]) -> None: resource_names = { res_name for listener in listeners - for res in listener.get("resources", []) - for res_name in res.get("names", []) + if listener.http_options + for res in listener.http_options.resources + for res_name in res.names } for resource in resource_names: - if resource not in KNOWN_RESOURCES: - raise ConfigError("Unknown listener resource '%s'" % (resource,)) if resource == "consent": try: check_requirements("resources.consent") diff --git a/synapse/config/tls.py b/synapse/config/tls.py index a65538562b..e368ea564d 100644 --- a/synapse/config/tls.py +++ b/synapse/config/tls.py @@ -20,8 +20,6 @@ from datetime import datetime from hashlib import sha256 from typing import List -import six - from unpaddedbase64 import encode_base64 from OpenSSL import SSL, crypto @@ -59,7 +57,7 @@ class TlsConfig(Config): logger.warning(ACME_SUPPORT_ENABLED_WARN) # hyperlink complains on py2 if this is not a Unicode - self.acme_url = six.text_type( + self.acme_url = str( acme_config.get("url", "https://acme-v01.api.letsencrypt.org/directory") ) self.acme_port = acme_config.get("port", 80) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index ed06b91a54..dbc661630c 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -16,6 +16,7 @@ import attr from ._base import Config, ConfigError +from .server import ListenerConfig, parse_listener_def @attr.s @@ -52,7 +53,9 @@ class WorkerConfig(Config): if self.worker_app == "synapse.app.homeserver": self.worker_app = None - self.worker_listeners = config.get("worker_listeners", []) + self.worker_listeners = [ + parse_listener_def(x) for x in config.get("worker_listeners", []) + ] self.worker_daemonize = config.get("worker_daemonize") self.worker_pid_file = config.get("worker_pid_file") self.worker_log_config = config.get("worker_log_config") @@ -75,24 +78,11 @@ class WorkerConfig(Config): manhole = config.get("worker_manhole") if manhole: self.worker_listeners.append( - { - "port": manhole, - "bind_addresses": ["127.0.0.1"], - "type": "manhole", - "tls": False, - } + ListenerConfig( + port=manhole, bind_addresses=["127.0.0.1"], type="manhole", + ) ) - if self.worker_listeners: - for listener in self.worker_listeners: - bind_address = listener.pop("bind_address", None) - bind_addresses = listener.setdefault("bind_addresses", []) - - if bind_address: - bind_addresses.append(bind_address) - elif not bind_addresses: - bind_addresses.append("") - # A map from instance name to host/port of their HTTP replication endpoint. instance_map = config.get("instance_map") or {} self.instance_map = { diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index a9f4025bfe..dbfc3e8972 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -15,11 +15,9 @@ # limitations under the License. import logging +import urllib from collections import defaultdict -import six -from six.moves import urllib - import attr from signedjson.key import ( decode_verify_key_bytes, @@ -661,7 +659,7 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher): for response in query_response["server_keys"]: # do this first, so that we can give useful errors thereafter server_name = response.get("server_name") - if not isinstance(server_name, six.string_types): + if not isinstance(server_name, str): raise KeyLookupError( "Malformed response from key notary server %s: invalid server_name" % (perspective_name,) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 533ba327f5..cc5deca75b 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -20,8 +20,6 @@ import os from distutils.util import strtobool from typing import Dict, Optional, Type -import six - from unpaddedbase64 import encode_base64 from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions @@ -290,7 +288,7 @@ class EventBase(metaclass=abc.ABCMeta): return list(self._dict.items()) def keys(self): - return six.iterkeys(self._dict) + return self._dict.keys() def prev_event_ids(self): """Returns the list of prev event IDs. The order matches the order diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 7c5f620d09..f94cdcbaba 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -14,8 +14,6 @@ # limitations under the License. from typing import Optional, Union -from six import iteritems - import attr from frozendict import frozendict @@ -341,7 +339,7 @@ def _encode_state_dict(state_dict): if state_dict is None: return None - return [(etype, state_key, v) for (etype, state_key), v in iteritems(state_dict)] + return [(etype, state_key, v) for (etype, state_key), v in state_dict.items()] def _decode_state_dict(input): diff --git a/synapse/events/utils.py b/synapse/events/utils.py index dd340be9a7..f6b507977f 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -16,8 +16,6 @@ import collections import re from typing import Any, Mapping, Union -from six import string_types - from frozendict import frozendict from twisted.internet import defer @@ -318,7 +316,7 @@ def serialize_event( if only_event_fields: if not isinstance(only_event_fields, list) or not all( - isinstance(f, string_types) for f in only_event_fields + isinstance(f, str) for f in only_event_fields ): raise TypeError("only_event_fields must be a list of strings") d = only_fields(d, only_event_fields) diff --git a/synapse/events/validator.py b/synapse/events/validator.py index b001c64bb4..588d222f36 100644 --- a/synapse/events/validator.py +++ b/synapse/events/validator.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from six import integer_types, string_types - from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes, Membership from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import EventFormatVersions @@ -53,7 +51,7 @@ class EventValidator(object): event_strings = ["origin"] for s in event_strings: - if not isinstance(getattr(event, s), string_types): + if not isinstance(getattr(event, s), str): raise SynapseError(400, "'%s' not a string type" % (s,)) # Depending on the room version, ensure the data is spec compliant JSON. @@ -90,7 +88,7 @@ class EventValidator(object): max_lifetime = event.content.get("max_lifetime") if min_lifetime is not None: - if not isinstance(min_lifetime, integer_types): + if not isinstance(min_lifetime, int): raise SynapseError( code=400, msg="'min_lifetime' must be an integer", @@ -124,7 +122,7 @@ class EventValidator(object): ) if max_lifetime is not None: - if not isinstance(max_lifetime, integer_types): + if not isinstance(max_lifetime, int): raise SynapseError( code=400, msg="'max_lifetime' must be an integer", @@ -183,7 +181,7 @@ class EventValidator(object): strings.append("state_key") for s in strings: - if not isinstance(getattr(event, s), string_types): + if not isinstance(getattr(event, s), str): raise SynapseError(400, "Not '%s' a string type" % (s,)) RoomID.from_string(event.room_id) @@ -223,7 +221,7 @@ class EventValidator(object): for s in keys: if s not in d: raise SynapseError(400, "'%s' not in content" % (s,)) - if not isinstance(d[s], string_types): + if not isinstance(d[s], str): raise SynapseError(400, "'%s' not a string type" % (s,)) def _ensure_state_event(self, event): diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index c0012c6872..420df2385f 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -17,8 +17,6 @@ import logging from collections import namedtuple from typing import Iterable, List -import six - from twisted.internet import defer from twisted.internet.defer import Deferred, DeferredList from twisted.python.failure import Failure @@ -93,8 +91,8 @@ class FederationBase(object): # *actual* redacted copy to be on the safe side.) redacted_event = prune_event(pdu) if set(redacted_event.keys()) == set(pdu.keys()) and set( - six.iterkeys(redacted_event.content) - ) == set(six.iterkeys(pdu.content)): + redacted_event.content.keys() + ) == set(pdu.content.keys()): logger.info( "Event %s seems to have been redacted; using our redacted " "copy", @@ -294,7 +292,7 @@ def event_from_pdu_json( assert_params_in_dict(pdu_json, ("type", "depth")) depth = pdu_json["depth"] - if not isinstance(depth, six.integer_types): + if not isinstance(depth, int): raise SynapseError(400, "Depth %r not an intger" % (depth,), Codes.BAD_JSON) if depth < 0: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 32a8a2ee46..e704cf2f44 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -17,11 +17,8 @@ import logging from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union -import six -from six import iteritems - from canonicaljson import json -from prometheus_client import Counter +from prometheus_client import Counter, Histogram from twisted.internet import defer from twisted.internet.abstract import isIPAddress @@ -73,6 +70,10 @@ received_queries_counter = Counter( "synapse_federation_server_received_queries", "", ["type"] ) +pdu_process_time = Histogram( + "synapse_federation_server_pdu_process_time", "Time taken to process an event", +) + class FederationServer(FederationBase): def __init__(self, hs): @@ -274,21 +275,22 @@ class FederationServer(FederationBase): for pdu in pdus_by_room[room_id]: event_id = pdu.event_id - with nested_logging_context(event_id): - try: - await self._handle_received_pdu(origin, pdu) - pdu_results[event_id] = {} - except FederationError as e: - logger.warning("Error handling PDU %s: %s", event_id, e) - pdu_results[event_id] = {"error": str(e)} - except Exception as e: - f = failure.Failure() - pdu_results[event_id] = {"error": str(e)} - logger.error( - "Failed to handle PDU %s", - event_id, - exc_info=(f.type, f.value, f.getTracebackObject()), - ) + with pdu_process_time.time(): + with nested_logging_context(event_id): + try: + await self._handle_received_pdu(origin, pdu) + pdu_results[event_id] = {} + except FederationError as e: + logger.warning("Error handling PDU %s: %s", event_id, e) + pdu_results[event_id] = {"error": str(e)} + except Exception as e: + f = failure.Failure() + pdu_results[event_id] = {"error": str(e)} + logger.error( + "Failed to handle PDU %s", + event_id, + exc_info=(f.type, f.value, f.getTracebackObject()), + ) await concurrently_execute( process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT @@ -534,9 +536,9 @@ class FederationServer(FederationBase): ",".join( ( "%s for %s:%s" % (key_id, user_id, device_id) - for user_id, user_keys in iteritems(json_result) - for device_id, device_keys in iteritems(user_keys) - for key_id, _ in iteritems(device_keys) + for user_id, user_keys in json_result.items() + for device_id, device_keys in user_keys.items() + for key_id, _ in device_keys.items() ) ), ) @@ -752,7 +754,7 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool: def _acl_entry_matches(server_name: str, acl_entry: str) -> Match: - if not isinstance(acl_entry, six.string_types): + if not isinstance(acl_entry, str): logger.warning( "Ignoring non-str ACL entry '%s' (is %s)", acl_entry, type(acl_entry) ) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 52f4f54215..6bbd762681 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -33,8 +33,6 @@ import logging from collections import namedtuple from typing import Dict, List, Tuple, Type -from six import iteritems - from sortedcontainers import SortedDict from twisted.internet import defer @@ -327,7 +325,7 @@ class FederationRemoteSendQueue(object): # stream position. keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]} - for ((destination, edu_key), pos) in iteritems(keyed_edus): + for ((destination, edu_key), pos) in keyed_edus.items(): rows.append( ( pos, @@ -530,10 +528,10 @@ def process_rows_for_federation(transaction_queue, rows): states=[state], destinations=destinations ) - for destination, edu_map in iteritems(buff.keyed_edus): + for destination, edu_map in buff.keyed_edus.items(): for key, edu in edu_map.items(): transaction_queue.send_edu(edu, key) - for destination, edu_list in iteritems(buff.edus): + for destination, edu_list in buff.edus.items(): for edu in edu_list: transaction_queue.send_edu(edu, None) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index d473576902..23fb515683 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -16,8 +16,6 @@ import logging from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple -from six import itervalues - from prometheus_client import Counter from twisted.internet import defer @@ -203,7 +201,15 @@ class FederationSender(object): logger.debug("Sending %s to %r", event, destinations) - self._send_pdu(event, destinations) + if destinations: + self._send_pdu(event, destinations) + + now = self.clock.time_msec() + ts = await self.store.get_received_ts(event.event_id) + + synapse.metrics.event_processing_lag_by_event.labels( + "federation_sender" + ).observe(now - ts) async def handle_room_events(events: Iterable[EventBase]) -> None: with Measure(self.clock, "handle_room_events"): @@ -218,7 +224,7 @@ class FederationSender(object): defer.gatherResults( [ run_in_background(handle_room_events, evs) - for evs in itervalues(events_by_room) + for evs in events_by_room.values() ], consumeErrors=True, ) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 060bf07197..9f99311419 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -15,10 +15,9 @@ # limitations under the License. import logging +import urllib from typing import Any, Dict, Optional -from six.moves import urllib - from twisted.internet import defer from synapse.api.constants import Membership diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 8a9de913b3..8db8ab1b7b 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -17,8 +17,6 @@ import logging -from six import string_types - from synapse.api.errors import Codes, SynapseError from synapse.types import GroupID, RoomID, UserID, get_domain_from_id from synapse.util.async_helpers import concurrently_execute @@ -513,7 +511,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler): for keyname in ("name", "avatar_url", "short_description", "long_description"): if keyname in content: value = content[keyname] - if not isinstance(value, string_types): + if not isinstance(value, str): raise SynapseError(400, "%r value is not a string" % (keyname,)) profile[keyname] = value diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index fe62f78e67..f7d9fd621e 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -15,8 +15,6 @@ import logging -from six import itervalues - from prometheus_client import Counter from twisted.internet import defer @@ -116,6 +114,12 @@ class ApplicationServicesHandler(object): for service in services: self.scheduler.submit_event_for_as(service, event) + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(event.event_id) + synapse.metrics.event_processing_lag_by_event.labels( + "appservice_sender" + ).observe(now - ts) + @defer.inlineCallbacks def handle_room_events(events): for event in events: @@ -125,7 +129,7 @@ class ApplicationServicesHandler(object): defer.gatherResults( [ run_in_background(handle_room_events, evs) - for evs in itervalues(events_by_room) + for evs in events_by_room.values() ], consumeErrors=True, ) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index bb3b43d5ae..c3f86e7414 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -297,7 +297,7 @@ class AuthHandler(BaseHandler): # Convert the URI and method to strings. uri = request.uri.decode("utf-8") - method = request.uri.decode("utf-8") + method = request.method.decode("utf-8") # If there's no session ID, create a new session. if not sid: diff --git a/synapse/handlers/cas_handler.py b/synapse/handlers/cas_handler.py index 64aaa1335c..76f213723a 100644 --- a/synapse/handlers/cas_handler.py +++ b/synapse/handlers/cas_handler.py @@ -14,11 +14,10 @@ # limitations under the License. import logging +import urllib import xml.etree.ElementTree as ET from typing import Dict, Optional, Tuple -from six.moves import urllib - from twisted.web.client import PartialDownloadError from synapse.api.errors import Codes, LoginError diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 230d170258..31346b56c3 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -17,8 +17,6 @@ import logging from typing import Any, Dict, Optional -from six import iteritems, itervalues - from twisted.internet import defer from synapse.api import errors @@ -159,7 +157,7 @@ class DeviceWorkerHandler(BaseHandler): # The user may have left the room # TODO: Check if they actually did or if we were just invited. if room_id not in room_ids: - for key, event_id in iteritems(current_state_ids): + for key, event_id in current_state_ids.items(): etype, state_key = key if etype != EventTypes.Member: continue @@ -182,7 +180,7 @@ class DeviceWorkerHandler(BaseHandler): log_kv( {"event": "encountered empty previous state", "room_id": room_id} ) - for key, event_id in iteritems(current_state_ids): + for key, event_id in current_state_ids.items(): etype, state_key = key if etype != EventTypes.Member: continue @@ -198,10 +196,10 @@ class DeviceWorkerHandler(BaseHandler): # Check if we've joined the room? If so we just blindly add all the users to # the "possibly changed" users. - for state_dict in itervalues(prev_state_ids): + for state_dict in prev_state_ids.values(): member_event = state_dict.get((EventTypes.Member, user_id), None) if not member_event or member_event != current_member_id: - for key, event_id in iteritems(current_state_ids): + for key, event_id in current_state_ids.items(): etype, state_key = key if etype != EventTypes.Member: continue @@ -211,14 +209,14 @@ class DeviceWorkerHandler(BaseHandler): # If there has been any change in membership, include them in the # possibly changed list. We'll check if they are joined below, # and we're not toooo worried about spuriously adding users. - for key, event_id in iteritems(current_state_ids): + for key, event_id in current_state_ids.items(): etype, state_key = key if etype != EventTypes.Member: continue # check if this member has changed since any of the extremities # at the stream_ordering, and add them to the list if so. - for state_dict in itervalues(prev_state_ids): + for state_dict in prev_state_ids.values(): prev_event_id = state_dict.get(key, None) if not prev_event_id or prev_event_id != event_id: if state_key != user_id: @@ -693,6 +691,7 @@ class DeviceListUpdater(object): return False + @trace @defer.inlineCallbacks def _maybe_retry_device_resync(self): """Retry to resync device lists that are out of sync, except if another retry is diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 05c4b3eec0..610b08d00b 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -18,8 +18,6 @@ from typing import Any, Dict from canonicaljson import json -from twisted.internet import defer - from synapse.api.errors import SynapseError from synapse.logging.context import run_in_background from synapse.logging.opentracing import ( @@ -51,8 +49,7 @@ class DeviceMessageHandler(object): self._device_list_updater = hs.get_device_handler().device_list_updater - @defer.inlineCallbacks - def on_direct_to_device_edu(self, origin, content): + async def on_direct_to_device_edu(self, origin, content): local_messages = {} sender_user_id = content["sender"] if origin != get_domain_from_id(sender_user_id): @@ -82,11 +79,11 @@ class DeviceMessageHandler(object): } local_messages[user_id] = messages_by_device - yield self._check_for_unknown_devices( + await self._check_for_unknown_devices( message_type, sender_user_id, by_device ) - stream_id = yield self.store.add_messages_from_remote_to_device_inbox( + stream_id = await self.store.add_messages_from_remote_to_device_inbox( origin, message_id, local_messages ) @@ -94,14 +91,13 @@ class DeviceMessageHandler(object): "to_device_key", stream_id, users=local_messages.keys() ) - @defer.inlineCallbacks - def _check_for_unknown_devices( + async def _check_for_unknown_devices( self, message_type: str, sender_user_id: str, by_device: Dict[str, Dict[str, Any]], ): - """Checks inbound device messages for unkown remote devices, and if + """Checks inbound device messages for unknown remote devices, and if found marks the remote cache for the user as stale. """ @@ -115,7 +111,7 @@ class DeviceMessageHandler(object): requesting_device_ids.add(device_id) # Check if we are tracking the devices of the remote user. - room_ids = yield self.store.get_rooms_for_user(sender_user_id) + room_ids = await self.store.get_rooms_for_user(sender_user_id) if not room_ids: logger.info( "Received device message from remote device we don't" @@ -127,7 +123,7 @@ class DeviceMessageHandler(object): # If we are tracking check that we know about the sending # devices. - cached_devices = yield self.store.get_cached_devices_for_user(sender_user_id) + cached_devices = await self.store.get_cached_devices_for_user(sender_user_id) unknown_devices = requesting_device_ids - set(cached_devices) if unknown_devices: @@ -136,15 +132,14 @@ class DeviceMessageHandler(object): sender_user_id, unknown_devices, ) - yield self.store.mark_remote_user_device_cache_as_stale(sender_user_id) + await self.store.mark_remote_user_device_cache_as_stale(sender_user_id) # Immediately attempt a resync in the background run_in_background( self._device_list_updater.user_device_resync, sender_user_id ) - @defer.inlineCallbacks - def send_device_message(self, sender_user_id, message_type, messages): + async def send_device_message(self, sender_user_id, message_type, messages): set_tag("number_of_messages", len(messages)) set_tag("sender", sender_user_id) local_messages = {} @@ -183,7 +178,7 @@ class DeviceMessageHandler(object): } log_kv({"local_messages": local_messages}) - stream_id = yield self.store.add_messages_to_device_inbox( + stream_id = await self.store.add_messages_to_device_inbox( local_messages, remote_edu_contents ) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index f2f16b1e43..79a2df6201 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -17,8 +17,6 @@ import logging import string from typing import Iterable, List, Optional -from twisted.internet import defer - from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes from synapse.api.errors import ( AuthError, @@ -55,8 +53,7 @@ class DirectoryHandler(BaseHandler): self.spam_checker = hs.get_spam_checker() - @defer.inlineCallbacks - def _create_association( + async def _create_association( self, room_alias: RoomAlias, room_id: str, @@ -76,13 +73,13 @@ class DirectoryHandler(BaseHandler): # TODO(erikj): Add transactions. # TODO(erikj): Check if there is a current association. if not servers: - users = yield self.state.get_current_users_in_room(room_id) + users = await self.state.get_current_users_in_room(room_id) servers = {get_domain_from_id(u) for u in users} if not servers: raise SynapseError(400, "Failed to get server list") - yield self.store.create_room_alias_association( + await self.store.create_room_alias_association( room_alias, room_id, servers, creator=creator ) @@ -93,7 +90,7 @@ class DirectoryHandler(BaseHandler): room_id: str, servers: Optional[List[str]] = None, check_membership: bool = True, - ): + ) -> None: """Attempt to create a new alias Args: @@ -103,9 +100,6 @@ class DirectoryHandler(BaseHandler): servers: Iterable of servers that others servers should try and join via check_membership: Whether to check if the user is in the room before the alias can be set (if the server's config requires it). - - Returns: - Deferred """ user_id = requester.user.to_string() @@ -148,7 +142,7 @@ class DirectoryHandler(BaseHandler): # per alias creation rule? raise SynapseError(403, "Not allowed to create alias") - can_create = await self.can_modify_alias(room_alias, user_id=user_id) + can_create = self.can_modify_alias(room_alias, user_id=user_id) if not can_create: raise AuthError( 400, @@ -158,7 +152,9 @@ class DirectoryHandler(BaseHandler): await self._create_association(room_alias, room_id, servers, creator=user_id) - async def delete_association(self, requester: Requester, room_alias: RoomAlias): + async def delete_association( + self, requester: Requester, room_alias: RoomAlias + ) -> str: """Remove an alias from the directory (this is only meant for human users; AS users should call @@ -169,7 +165,7 @@ class DirectoryHandler(BaseHandler): room_alias Returns: - Deferred[unicode]: room id that the alias used to point to + room id that the alias used to point to Raises: NotFoundError: if the alias doesn't exist @@ -191,7 +187,7 @@ class DirectoryHandler(BaseHandler): if not can_delete: raise AuthError(403, "You don't have permission to delete the alias.") - can_delete = await self.can_modify_alias(room_alias, user_id=user_id) + can_delete = self.can_modify_alias(room_alias, user_id=user_id) if not can_delete: raise SynapseError( 400, @@ -208,8 +204,7 @@ class DirectoryHandler(BaseHandler): return room_id - @defer.inlineCallbacks - def delete_appservice_association( + async def delete_appservice_association( self, service: ApplicationService, room_alias: RoomAlias ): if not service.is_interested_in_alias(room_alias.to_string()): @@ -218,29 +213,27 @@ class DirectoryHandler(BaseHandler): "This application service has not reserved this kind of alias", errcode=Codes.EXCLUSIVE, ) - yield self._delete_association(room_alias) + await self._delete_association(room_alias) - @defer.inlineCallbacks - def _delete_association(self, room_alias: RoomAlias): + async def _delete_association(self, room_alias: RoomAlias): if not self.hs.is_mine(room_alias): raise SynapseError(400, "Room alias must be local") - room_id = yield self.store.delete_room_alias(room_alias) + room_id = await self.store.delete_room_alias(room_alias) return room_id - @defer.inlineCallbacks - def get_association(self, room_alias: RoomAlias): + async def get_association(self, room_alias: RoomAlias): room_id = None if self.hs.is_mine(room_alias): - result = yield self.get_association_from_room_alias(room_alias) + result = await self.get_association_from_room_alias(room_alias) if result: room_id = result.room_id servers = result.servers else: try: - result = yield self.federation.make_query( + result = await self.federation.make_query( destination=room_alias.domain, query_type="directory", args={"room_alias": room_alias.to_string()}, @@ -265,7 +258,7 @@ class DirectoryHandler(BaseHandler): Codes.NOT_FOUND, ) - users = yield self.state.get_current_users_in_room(room_id) + users = await self.state.get_current_users_in_room(room_id) extra_servers = {get_domain_from_id(u) for u in users} servers = set(extra_servers) | set(servers) @@ -277,13 +270,12 @@ class DirectoryHandler(BaseHandler): return {"room_id": room_id, "servers": servers} - @defer.inlineCallbacks - def on_directory_query(self, args): + async def on_directory_query(self, args): room_alias = RoomAlias.from_string(args["room_alias"]) if not self.hs.is_mine(room_alias): raise SynapseError(400, "Room Alias is not hosted on this homeserver") - result = yield self.get_association_from_room_alias(room_alias) + result = await self.get_association_from_room_alias(room_alias) if result is not None: return {"room_id": result.room_id, "servers": result.servers} @@ -344,16 +336,15 @@ class DirectoryHandler(BaseHandler): ratelimit=False, ) - @defer.inlineCallbacks - def get_association_from_room_alias(self, room_alias: RoomAlias): - result = yield self.store.get_association_from_room_alias(room_alias) + async def get_association_from_room_alias(self, room_alias: RoomAlias): + result = await self.store.get_association_from_room_alias(room_alias) if not result: # Query AS to see if it exists as_handler = self.appservice_handler - result = yield as_handler.query_room_alias_exists(room_alias) + result = await as_handler.query_room_alias_exists(room_alias) return result - def can_modify_alias(self, alias: RoomAlias, user_id: Optional[str] = None): + def can_modify_alias(self, alias: RoomAlias, user_id: Optional[str] = None) -> bool: # Any application service "interested" in an alias they are regexing on # can modify the alias. # Users can only modify the alias if ALL the interested services have @@ -366,12 +357,12 @@ class DirectoryHandler(BaseHandler): for service in interested_services: if user_id == service.sender: # this user IS the app service so they can do whatever they like - return defer.succeed(True) + return True elif service.is_exclusive_alias(alias.to_string()): # another service has an exclusive lock on this alias. - return defer.succeed(False) + return False # either no interested services, or no service with an exclusive lock - return defer.succeed(True) + return True async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str): """Determine whether a user can delete an alias. @@ -459,8 +450,7 @@ class DirectoryHandler(BaseHandler): await self.store.set_room_is_public(room_id, making_public) - @defer.inlineCallbacks - def edit_published_appservice_room_list( + async def edit_published_appservice_room_list( self, appservice_id: str, network_id: str, room_id: str, visibility: str ): """Add or remove a room from the appservice/network specific public @@ -475,7 +465,7 @@ class DirectoryHandler(BaseHandler): if visibility not in ["public", "private"]: raise SynapseError(400, "Invalid visibility setting") - yield self.store.set_room_is_public_appservice( + await self.store.set_room_is_public_appservice( room_id, appservice_id, network_id, visibility == "public" ) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 774a252619..a7e60cbc26 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -17,8 +17,6 @@ import logging -from six import iteritems - import attr from canonicaljson import encode_canonical_json, json from signedjson.key import decode_verify_key_bytes @@ -135,7 +133,7 @@ class E2eKeysHandler(object): remote_queries_not_in_cache = {} if remote_queries: query_list = [] - for user_id, device_ids in iteritems(remote_queries): + for user_id, device_ids in remote_queries.items(): if device_ids: query_list.extend((user_id, device_id) for device_id in device_ids) else: @@ -145,9 +143,9 @@ class E2eKeysHandler(object): user_ids_not_in_cache, remote_results, ) = yield self.store.get_user_devices_from_cache(query_list) - for user_id, devices in iteritems(remote_results): + for user_id, devices in remote_results.items(): user_devices = results.setdefault(user_id, {}) - for device_id, device in iteritems(devices): + for device_id, device in devices.items(): keys = device.get("keys", None) device_display_name = device.get("device_display_name", None) if keys: @@ -446,9 +444,9 @@ class E2eKeysHandler(object): ",".join( ( "%s for %s:%s" % (key_id, user_id, device_id) - for user_id, user_keys in iteritems(json_result) - for device_id, device_keys in iteritems(user_keys) - for key_id, _ in iteritems(device_keys) + for user_id, user_keys in json_result.items() + for device_id, device_keys in user_keys.items() + for key_id, _ in device_keys.items() ) ), ) diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index 9abaf13b8f..f55470a707 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -16,8 +16,6 @@ import logging -from six import iteritems - from twisted.internet import defer from synapse.api.errors import ( @@ -205,8 +203,8 @@ class E2eRoomKeysHandler(object): ) to_insert = [] # batch the inserts together changed = False # if anything has changed, we need to update the etag - for room_id, room in iteritems(room_keys["rooms"]): - for session_id, room_key in iteritems(room["sessions"]): + for room_id, room in room_keys["rooms"].items(): + for session_id, room_key in room["sessions"].items(): if not isinstance(room_key["is_verified"], bool): msg = ( "is_verified must be a boolean in keys for session %s in" @@ -351,6 +349,7 @@ class E2eRoomKeysHandler(object): raise res["count"] = yield self.store.count_e2e_room_keys(user_id, res["version"]) + res["etag"] = str(res["etag"]) return res @trace diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d0b62f4cf2..b5aaa244dd 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -19,11 +19,9 @@ import itertools import logging -from typing import Dict, Iterable, List, Optional, Sequence, Tuple - -import six -from six import iteritems, itervalues -from six.moves import http_client, zip +from collections import Container +from http import HTTPStatus +from typing import Dict, Iterable, List, Optional, Sequence, Tuple, Union import attr from signedjson.key import decode_verify_key_bytes @@ -33,7 +31,12 @@ from unpaddedbase64 import decode_base64 from twisted.internet import defer from synapse import event_auth -from synapse.api.constants import EventTypes, Membership, RejectedReason +from synapse.api.constants import ( + EventTypes, + Membership, + RejectedReason, + RoomEncryptionAlgorithms, +) from synapse.api.errors import ( AuthError, CodeMessageException, @@ -374,6 +377,7 @@ class FederationHandler(BaseHandler): room_version = await self.store.get_room_version_id(room_id) state_map = await resolve_events_with_store( + self.clock, room_id, room_version, state_maps, @@ -393,7 +397,7 @@ class FederationHandler(BaseHandler): ) event_map.update(evs) - state = [event_map[e] for e in six.itervalues(state_map)] + state = [event_map[e] for e in state_map.values()] except Exception: logger.warning( "[%s %s] Error attempting to resolve state at missing " @@ -739,10 +743,16 @@ class FederationHandler(BaseHandler): # device and recognize the algorithm then we can work out the # exact key to expect. Otherwise check it matches any key we # have for that device. + + current_keys = [] # type: Container[str] + if device: keys = device.get("keys", {}).get("keys", {}) - if event.content.get("algorithm") == "m.megolm.v1.aes-sha2": + if ( + event.content.get("algorithm") + == RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2 + ): # For this algorithm we expect a curve25519 key. key_name = "curve25519:%s" % (device_id,) current_keys = [keys.get(key_name)] @@ -752,15 +762,15 @@ class FederationHandler(BaseHandler): current_keys = keys.values() elif device_id: # We don't have any keys for the device ID. - current_keys = [] + pass else: # The event didn't include a device ID, so we just look for # keys across all devices. - current_keys = ( + current_keys = [ key for device in cached_devices for key in device.get("keys", {}).get("keys", {}).values() - ) + ] # We now check that the sender key matches (one of) the expected # keys. @@ -1001,11 +1011,11 @@ class FederationHandler(BaseHandler): """ joined_users = [ (state_key, int(event.depth)) - for (e_type, state_key), event in iteritems(state) + for (e_type, state_key), event in state.items() if e_type == EventTypes.Member and event.membership == Membership.JOIN ] - joined_domains = {} + joined_domains = {} # type: Dict[str, int] for u, d in joined_users: try: dom = get_domain_from_id(u) @@ -1091,16 +1101,16 @@ class FederationHandler(BaseHandler): states = dict(zip(event_ids, [s.state for s in states])) state_map = await self.store.get_events( - [e_id for ids in itervalues(states) for e_id in itervalues(ids)], + [e_id for ids in states.values() for e_id in ids.values()], get_prev_content=False, ) states = { key: { k: state_map[e_id] - for k, e_id in iteritems(state_dict) + for k, e_id in state_dict.items() if e_id in state_map } - for key, state_dict in iteritems(states) + for key, state_dict in states.items() } for e_id, _ in sorted_extremeties_tuple: @@ -1188,7 +1198,7 @@ class FederationHandler(BaseHandler): ev.event_id, len(ev.prev_event_ids()), ) - raise SynapseError(http_client.BAD_REQUEST, "Too many prev_events") + raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many prev_events") if len(ev.auth_event_ids()) > 10: logger.warning( @@ -1196,7 +1206,7 @@ class FederationHandler(BaseHandler): ev.event_id, len(ev.auth_event_ids()), ) - raise SynapseError(http_client.BAD_REQUEST, "Too many auth_events") + raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events") async def send_invite(self, target_host, event): """ Sends the invite to the remote server for signing. @@ -1271,14 +1281,15 @@ class FederationHandler(BaseHandler): try: # Try the host we successfully got a response to /make_join/ # request first. + host_list = list(target_hosts) try: - target_hosts.remove(origin) - target_hosts.insert(0, origin) + host_list.remove(origin) + host_list.insert(0, origin) except ValueError: pass ret = await self.federation_client.send_join( - target_hosts, event, room_version_obj + host_list, event, room_version_obj ) origin = ret["origin"] @@ -1539,7 +1550,7 @@ class FederationHandler(BaseHandler): # block any attempts to invite the server notices mxid if event.state_key == self._server_notices_mxid: - raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user") + raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user") # keep a record of the room version, if we don't yet know it. # (this may get overwritten if we later get a different room version in a @@ -1578,13 +1589,14 @@ class FederationHandler(BaseHandler): # Try the host that we succesfully called /make_leave/ on first for # the /send_leave/ request. + host_list = list(target_hosts) try: - target_hosts.remove(origin) - target_hosts.insert(0, origin) + host_list.remove(origin) + host_list.insert(0, origin) except ValueError: pass - await self.federation_client.send_leave(target_hosts, event) + await self.federation_client.send_leave(host_list, event) context = await self.state_handler.compute_event_context(event) stream_id = await self.persist_events_and_notify([(event, context)]) @@ -1598,7 +1610,7 @@ class FederationHandler(BaseHandler): user_id: str, membership: str, content: JsonDict = {}, - params: Optional[Dict[str, str]] = None, + params: Optional[Dict[str, Union[str, Iterable[str]]]] = None, ) -> Tuple[str, EventBase, RoomVersion]: ( origin, @@ -1725,7 +1737,7 @@ class FederationHandler(BaseHandler): state_groups = await self.state_store.get_state_groups(room_id, [event_id]) if state_groups: - _, state = list(iteritems(state_groups)).pop() + _, state = list(state_groups.items()).pop() results = {(e.type, e.state_key): e for e in state} if event.is_state(): @@ -2012,8 +2024,8 @@ class FederationHandler(BaseHandler): auth_events_ids = await self.auth.compute_auth_events( event, prev_state_ids, for_verification=True ) - auth_events = await self.store.get_events(auth_events_ids) - auth_events = {(e.type, e.state_key): e for e in auth_events.values()} + auth_events_x = await self.store.get_events(auth_events_ids) + auth_events = {(e.type, e.state_key): e for e in auth_events_x.values()} # This is a hack to fix some old rooms where the initial join event # didn't reference the create event in its auth events. @@ -2049,76 +2061,67 @@ class FederationHandler(BaseHandler): # For new (non-backfilled and non-outlier) events we check if the event # passes auth based on the current state. If it doesn't then we # "soft-fail" the event. - do_soft_fail_check = not backfilled and not event.internal_metadata.is_outlier() - if do_soft_fail_check: - extrem_ids = await self.store.get_latest_event_ids_in_room(event.room_id) - - extrem_ids = set(extrem_ids) - prev_event_ids = set(event.prev_event_ids()) - - if extrem_ids == prev_event_ids: - # If they're the same then the current state is the same as the - # state at the event, so no point rechecking auth for soft fail. - do_soft_fail_check = False - - if do_soft_fail_check: - room_version = await self.store.get_room_version_id(event.room_id) - room_version_obj = KNOWN_ROOM_VERSIONS[room_version] - - # Calculate the "current state". - if state is not None: - # If we're explicitly given the state then we won't have all the - # prev events, and so we have a gap in the graph. In this case - # we want to be a little careful as we might have been down for - # a while and have an incorrect view of the current state, - # however we still want to do checks as gaps are easy to - # maliciously manufacture. - # - # So we use a "current state" that is actually a state - # resolution across the current forward extremities and the - # given state at the event. This should correctly handle cases - # like bans, especially with state res v2. + if backfilled or event.internal_metadata.is_outlier(): + return - state_sets = await self.state_store.get_state_groups( - event.room_id, extrem_ids - ) - state_sets = list(state_sets.values()) - state_sets.append(state) - current_state_ids = await self.state_handler.resolve_events( - room_version, state_sets, event - ) - current_state_ids = { - k: e.event_id for k, e in iteritems(current_state_ids) - } - else: - current_state_ids = await self.state_handler.get_current_state_ids( - event.room_id, latest_event_ids=extrem_ids - ) + extrem_ids = await self.store.get_latest_event_ids_in_room(event.room_id) + extrem_ids = set(extrem_ids) + prev_event_ids = set(event.prev_event_ids()) - logger.debug( - "Doing soft-fail check for %s: state %s", - event.event_id, - current_state_ids, + if extrem_ids == prev_event_ids: + # If they're the same then the current state is the same as the + # state at the event, so no point rechecking auth for soft fail. + return + + room_version = await self.store.get_room_version_id(event.room_id) + room_version_obj = KNOWN_ROOM_VERSIONS[room_version] + + # Calculate the "current state". + if state is not None: + # If we're explicitly given the state then we won't have all the + # prev events, and so we have a gap in the graph. In this case + # we want to be a little careful as we might have been down for + # a while and have an incorrect view of the current state, + # however we still want to do checks as gaps are easy to + # maliciously manufacture. + # + # So we use a "current state" that is actually a state + # resolution across the current forward extremities and the + # given state at the event. This should correctly handle cases + # like bans, especially with state res v2. + + state_sets = await self.state_store.get_state_groups( + event.room_id, extrem_ids + ) + state_sets = list(state_sets.values()) + state_sets.append(state) + current_state_ids = await self.state_handler.resolve_events( + room_version, state_sets, event + ) + current_state_ids = {k: e.event_id for k, e in current_state_ids.items()} + else: + current_state_ids = await self.state_handler.get_current_state_ids( + event.room_id, latest_event_ids=extrem_ids ) - # Now check if event pass auth against said current state - auth_types = auth_types_for_event(event) - current_state_ids = [ - e for k, e in iteritems(current_state_ids) if k in auth_types - ] + logger.debug( + "Doing soft-fail check for %s: state %s", event.event_id, current_state_ids, + ) - current_auth_events = await self.store.get_events(current_state_ids) - current_auth_events = { - (e.type, e.state_key): e for e in current_auth_events.values() - } + # Now check if event pass auth against said current state + auth_types = auth_types_for_event(event) + current_state_ids = [e for k, e in current_state_ids.items() if k in auth_types] - try: - event_auth.check( - room_version_obj, event, auth_events=current_auth_events - ) - except AuthError as e: - logger.warning("Soft-failing %r because %s", event, e) - event.internal_metadata.soft_failed = True + current_auth_events = await self.store.get_events(current_state_ids) + current_auth_events = { + (e.type, e.state_key): e for e in current_auth_events.values() + } + + try: + event_auth.check(room_version_obj, event, auth_events=current_auth_events) + except AuthError as e: + logger.warning("Soft-failing %r because %s", event, e) + event.internal_metadata.soft_failed = True async def on_query_auth( self, origin, event_id, room_id, remote_auth_chain, rejects, missing @@ -2287,10 +2290,10 @@ class FederationHandler(BaseHandler): remote_auth_chain = await self.federation_client.get_event_auth( origin, event.room_id, event.event_id ) - except RequestSendFailed as e: + except RequestSendFailed as e1: # The other side isn't around or doesn't implement the # endpoint, so lets just bail out. - logger.info("Failed to get event auth from remote: %s", e) + logger.info("Failed to get event auth from remote: %s", e1) return context seen_remotes = await self.store.have_seen_events( @@ -2420,7 +2423,7 @@ class FederationHandler(BaseHandler): else: event_key = None state_updates = { - k: a.event_id for k, a in iteritems(auth_events) if k != event_key + k: a.event_id for k, a in auth_events.items() if k != event_key } current_state_ids = await context.get_current_state_ids() @@ -2431,7 +2434,7 @@ class FederationHandler(BaseHandler): prev_state_ids = await context.get_prev_state_ids() prev_state_ids = dict(prev_state_ids) - prev_state_ids.update({k: a.event_id for k, a in iteritems(auth_events)}) + prev_state_ids.update({k: a.event_id for k, a in auth_events.items()}) # create a new state group as a delta from the existing one. prev_group = context.state_group @@ -2768,7 +2771,8 @@ class FederationHandler(BaseHandler): logger.debug("Checking auth on event %r", event.content) - last_exception = None + last_exception = None # type: Optional[Exception] + # for each public key in the 3pid invite event for public_key_object in self.hs.get_auth().get_public_keys(invite_event): try: @@ -2822,6 +2826,12 @@ class FederationHandler(BaseHandler): return except Exception as e: last_exception = e + + if last_exception is None: + # we can only get here if get_public_keys() returned an empty list + # TODO: make this better + raise RuntimeError("no public key in invite event") + raise last_exception async def _check_key_revocation(self, public_key, url): diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index ebe8d25bd8..7cb106e365 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -16,8 +16,6 @@ import logging -from six import iteritems - from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError from synapse.types import get_domain_from_id @@ -227,7 +225,7 @@ class GroupsLocalWorkerHandler(object): results = {} failed_results = [] - for destination, dest_user_ids in iteritems(destinations): + for destination, dest_user_ids in destinations.items(): try: r = await self.transport_client.bulk_get_publicised_groups( destination, list(dest_user_ids) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 649ca1f08a..665ad19b5d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -17,8 +17,6 @@ import logging from typing import Optional, Tuple -from six import iteritems, itervalues, string_types - from canonicaljson import encode_canonical_json, json from twisted.internet import defer @@ -246,7 +244,7 @@ class MessageHandler(object): "avatar_url": profile.avatar_url, "display_name": profile.display_name, } - for user_id, profile in iteritems(users_with_profile) + for user_id, profile in users_with_profile.items() } def maybe_schedule_expiry(self, event): @@ -715,7 +713,7 @@ class EventCreationHandler(object): spam_error = self.spam_checker.check_event_for_spam(event) if spam_error: - if not isinstance(spam_error, string_types): + if not isinstance(spam_error, str): spam_error = "Spam is not permitted here" raise SynapseError(403, spam_error, Codes.FORBIDDEN) @@ -881,7 +879,9 @@ class EventCreationHandler(object): """ room_alias = RoomAlias.from_string(room_alias_str) try: - mapping = yield directory_handler.get_association(room_alias) + mapping = yield defer.ensureDeferred( + directory_handler.get_association(room_alias) + ) except SynapseError as e: # Turn M_NOT_FOUND errors into M_BAD_ALIAS errors. if e.errcode == Codes.NOT_FOUND: @@ -988,7 +988,7 @@ class EventCreationHandler(object): state_to_include_ids = [ e_id - for k, e_id in iteritems(current_state_ids) + for k, e_id in current_state_ids.items() if k[0] in self.room_invite_state_types or k == (EventTypes.Member, event.sender) ] @@ -1002,7 +1002,7 @@ class EventCreationHandler(object): "content": e.content, "sender": e.sender, } - for e in itervalues(state_to_include) + for e in state_to_include.values() ] invitee = UserID.from_string(event.state_key) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index d7442c62a7..da06582d4b 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -15,9 +15,6 @@ # limitations under the License. import logging -from six import iteritems - -from twisted.internet import defer from twisted.python.failure import Failure from synapse.api.constants import EventTypes, Membership @@ -99,8 +96,7 @@ class PaginationHandler(object): job["longest_max_lifetime"], ) - @defer.inlineCallbacks - def purge_history_for_rooms_in_range(self, min_ms, max_ms): + async def purge_history_for_rooms_in_range(self, min_ms, max_ms): """Purge outdated events from rooms within the given retention range. If a default retention policy is defined in the server's configuration and its @@ -139,13 +135,13 @@ class PaginationHandler(object): include_null, ) - rooms = yield self.store.get_rooms_for_retention_period_in_range( + rooms = await self.store.get_rooms_for_retention_period_in_range( min_ms, max_ms, include_null ) logger.debug("[purge] Rooms to purge: %s", rooms) - for room_id, retention_policy in iteritems(rooms): + for room_id, retention_policy in rooms.items(): logger.info("[purge] Attempting to purge messages in room %s", room_id) if room_id in self._purges_in_progress_by_room: @@ -167,9 +163,9 @@ class PaginationHandler(object): # Figure out what token we should start purging at. ts = self.clock.time_msec() - max_lifetime - stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts) + stream_ordering = await self.store.find_first_stream_ordering_after_ts(ts) - r = yield self.store.get_room_event_before_stream_ordering( + r = await self.store.get_room_event_before_stream_ordering( room_id, stream_ordering, ) if not r: @@ -229,8 +225,7 @@ class PaginationHandler(object): ) return purge_id - @defer.inlineCallbacks - def _purge_history(self, purge_id, room_id, token, delete_local_events): + async def _purge_history(self, purge_id, room_id, token, delete_local_events): """Carry out a history purge on a room. Args: @@ -239,14 +234,11 @@ class PaginationHandler(object): token (str): topological token to delete events before delete_local_events (bool): True to delete local events as well as remote ones - - Returns: - Deferred """ self._purges_in_progress_by_room.add(room_id) try: - with (yield self.pagination_lock.write(room_id)): - yield self.storage.purge_events.purge_history( + with await self.pagination_lock.write(room_id): + await self.storage.purge_events.purge_history( room_id, token, delete_local_events ) logger.info("[purge] complete") @@ -284,9 +276,7 @@ class PaginationHandler(object): await self.store.get_room_version_id(room_id) # first check that we have no users in this room - joined = await defer.maybeDeferred( - self.store.is_host_joined, room_id, self._server_name - ) + joined = await self.store.is_host_joined(room_id, self._server_name) if joined: raise SynapseError(400, "Users are still joined to this room") diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 3594f3b00f..d2f25ae12a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -25,9 +25,7 @@ The methods that define policy are: import abc import logging from contextlib import contextmanager -from typing import Dict, Iterable, List, Set - -from six import iteritems, itervalues +from typing import Dict, Iterable, List, Set, Tuple from prometheus_client import Counter from typing_extensions import ContextManager @@ -170,14 +168,14 @@ class BasePresenceHandler(abc.ABC): for user_id in user_ids } - missing = [user_id for user_id, state in iteritems(states) if not state] + missing = [user_id for user_id, state in states.items() if not state] if missing: # There are things not in our in memory cache. Lets pull them out of # the database. res = await self.store.get_presence_for_users(missing) states.update(res) - missing = [user_id for user_id, state in iteritems(states) if not state] + missing = [user_id for user_id, state in states.items() if not state] if missing: new = { user_id: UserPresenceState.default(user_id) for user_id in missing @@ -632,7 +630,7 @@ class PresenceHandler(BasePresenceHandler): await self._update_states( [ prev_state.copy_and_replace(last_user_sync_ts=time_now_ms) - for prev_state in itervalues(prev_states) + for prev_state in prev_states.values() ] ) self.external_process_last_updated_ms.pop(process_id, None) @@ -775,7 +773,9 @@ class PresenceHandler(BasePresenceHandler): return False - async def get_all_presence_updates(self, last_id, current_id, limit): + async def get_all_presence_updates( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, list]], int, bool]: """ Gets a list of presence update rows from between the given stream ids. Each row has: @@ -787,10 +787,31 @@ class PresenceHandler(BasePresenceHandler): - last_user_sync_ts(int) - status_msg(int) - currently_active(int) + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data """ + # TODO(markjh): replicate the unpersisted changes. # This could use the in-memory stores for recent changes. - rows = await self.store.get_all_presence_updates(last_id, current_id, limit) + rows = await self.store.get_all_presence_updates( + instance_name, last_id, current_id, limit + ) return rows def notify_new_event(self): @@ -1087,7 +1108,7 @@ class PresenceEventSource(object): return (list(updates.values()), max_token) else: return ( - [s for s in itervalues(updates) if s.state != PresenceState.OFFLINE], + [s for s in updates.values() if s.state != PresenceState.OFFLINE], max_token, ) @@ -1323,11 +1344,11 @@ def get_interested_remotes(store, states, state_handler): # hosts in those rooms. room_ids_to_states, users_to_states = yield get_interested_parties(store, states) - for room_id, states in iteritems(room_ids_to_states): + for room_id, states in room_ids_to_states.items(): hosts = yield state_handler.get_current_hosts_in_room(room_id) hosts_and_states.append((hosts, states)) - for user_id, states in iteritems(users_to_states): + for user_id, states in users_to_states.items(): host = get_domain_from_id(user_id) hosts_and_states.append(([host], states)) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 302efc1b9a..4b1e3073a8 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -15,8 +15,6 @@ import logging -from six import raise_from - from twisted.internet import defer from synapse.api.errors import ( @@ -84,7 +82,7 @@ class BaseProfileHandler(BaseHandler): ) return result except RequestSendFailed as e: - raise_from(SynapseError(502, "Failed to fetch profile"), e) + raise SynapseError(502, "Failed to fetch profile") from e except HttpResponseException as e: raise e.to_synapse_error() @@ -135,7 +133,7 @@ class BaseProfileHandler(BaseHandler): ignore_backoff=True, ) except RequestSendFailed as e: - raise_from(SynapseError(502, "Failed to fetch profile"), e) + raise SynapseError(502, "Failed to fetch profile") from e except HttpResponseException as e: raise e.to_synapse_error() @@ -212,7 +210,7 @@ class BaseProfileHandler(BaseHandler): ignore_backoff=True, ) except RequestSendFailed as e: - raise_from(SynapseError(502, "Failed to fetch profile"), e) + raise SynapseError(502, "Failed to fetch profile") from e except HttpResponseException as e: raise e.to_synapse_error() diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 51979ea43e..78c3772ac1 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -17,7 +17,7 @@ import logging from synapse import types -from synapse.api.constants import MAX_USERID_LENGTH, LoginType +from synapse.api.constants import MAX_USERID_LENGTH, EventTypes, JoinRules, LoginType from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError from synapse.config.server import is_threepid_reserved from synapse.http.servlet import assert_params_in_dict @@ -26,7 +26,8 @@ from synapse.replication.http.register import ( ReplicationPostRegisterActionsServlet, ReplicationRegisterServlet, ) -from synapse.types import RoomAlias, RoomID, UserID, create_requester +from synapse.storage.state import StateFilter +from synapse.types import RoomAlias, UserID, create_requester from synapse.util.async_helpers import Linearizer from ._base import BaseHandler @@ -270,51 +271,157 @@ class RegistrationHandler(BaseHandler): return user_id - async def _auto_join_rooms(self, user_id): - """Automatically joins users to auto join rooms - creating the room in the first place - if the user is the first to be created. + async def _create_and_join_rooms(self, user_id: str): + """ + Create the auto-join rooms and join or invite the user to them. + + This should only be called when the first "real" user registers. Args: - user_id(str): The user to join + user_id: The user to join """ - # auto-join the user to any rooms we're supposed to dump them into - fake_requester = create_requester(user_id) + # Getting the handlers during init gives a dependency loop. + room_creation_handler = self.hs.get_room_creation_handler() + room_member_handler = self.hs.get_room_member_handler() - # try to create the room if we're the first real user on the server. Note - # that an auto-generated support or bot user is not a real user and will never be - # the user to create the room - should_auto_create_rooms = False - is_real_user = await self.store.is_real_user(user_id) - if self.hs.config.autocreate_auto_join_rooms and is_real_user: - count = await self.store.count_real_users() - should_auto_create_rooms = count == 1 - for r in self.hs.config.auto_join_rooms: + # Generate a stub for how the rooms will be configured. + stub_config = { + "preset": self.hs.config.registration.autocreate_auto_join_room_preset, + } + + # If the configuration providers a user ID to create rooms with, use + # that instead of the first user registered. + requires_join = False + if self.hs.config.registration.auto_join_user_id: + fake_requester = create_requester( + self.hs.config.registration.auto_join_user_id + ) + + # If the room requires an invite, add the user to the list of invites. + if self.hs.config.registration.auto_join_room_requires_invite: + stub_config["invite"] = [user_id] + + # If the room is being created by a different user, the first user + # registered needs to join it. Note that in the case of an invitation + # being necessary this will occur after the invite was sent. + requires_join = True + else: + fake_requester = create_requester(user_id) + + # Choose whether to federate the new room. + if not self.hs.config.registration.autocreate_auto_join_rooms_federated: + stub_config["creation_content"] = {"m.federate": False} + + for r in self.hs.config.registration.auto_join_rooms: logger.info("Auto-joining %s to %s", user_id, r) + try: - if should_auto_create_rooms: - room_alias = RoomAlias.from_string(r) - if self.hs.hostname != room_alias.domain: - logger.warning( - "Cannot create room alias %s, " - "it does not match server domain", - r, - ) - else: - # create room expects the localpart of the room alias - room_alias_localpart = room_alias.localpart - - # getting the RoomCreationHandler during init gives a dependency - # loop - await self.hs.get_room_creation_handler().create_room( - fake_requester, - config={ - "preset": "public_chat", - "room_alias_name": room_alias_localpart, - }, + room_alias = RoomAlias.from_string(r) + + if self.hs.hostname != room_alias.domain: + logger.warning( + "Cannot create room alias %s, " + "it does not match server domain", + r, + ) + else: + # A shallow copy is OK here since the only key that is + # modified is room_alias_name. + config = stub_config.copy() + # create room expects the localpart of the room alias + config["room_alias_name"] = room_alias.localpart + + info, _ = await room_creation_handler.create_room( + fake_requester, config=config, ratelimit=False, + ) + + # If the room does not require an invite, but another user + # created it, then ensure the first user joins it. + if requires_join: + await room_member_handler.update_membership( + requester=create_requester(user_id), + target=UserID.from_string(user_id), + room_id=info["room_id"], + # Since it was just created, there are no remote hosts. + remote_room_hosts=[], + action="join", ratelimit=False, ) + + except ConsentNotGivenError as e: + # Technically not necessary to pull out this error though + # moving away from bare excepts is a good thing to do. + logger.error("Failed to join new user to %r: %r", r, e) + except Exception as e: + logger.error("Failed to join new user to %r: %r", r, e) + + async def _join_rooms(self, user_id: str): + """ + Join or invite the user to the auto-join rooms. + + Args: + user_id: The user to join + """ + room_member_handler = self.hs.get_room_member_handler() + + for r in self.hs.config.registration.auto_join_rooms: + logger.info("Auto-joining %s to %s", user_id, r) + + try: + room_alias = RoomAlias.from_string(r) + + if RoomAlias.is_valid(r): + ( + room_id, + remote_room_hosts, + ) = await room_member_handler.lookup_room_alias(room_alias) + room_id = room_id.to_string() else: - await self._join_user_to_room(fake_requester, r) + raise SynapseError( + 400, "%s was not legal room ID or room alias" % (r,) + ) + + # Calculate whether the room requires an invite or can be + # joined directly. Note that unless a join rule of public exists, + # it is treated as requiring an invite. + requires_invite = True + + state = await self.store.get_filtered_current_state_ids( + room_id, StateFilter.from_types([(EventTypes.JoinRules, "")]) + ) + + event_id = state.get((EventTypes.JoinRules, "")) + if event_id: + join_rules_event = await self.store.get_event( + event_id, allow_none=True + ) + if join_rules_event: + join_rule = join_rules_event.content.get("join_rule", None) + requires_invite = join_rule and join_rule != JoinRules.PUBLIC + + # Send the invite, if necessary. + if requires_invite: + await room_member_handler.update_membership( + requester=create_requester( + self.hs.config.registration.auto_join_user_id + ), + target=UserID.from_string(user_id), + room_id=room_id, + remote_room_hosts=remote_room_hosts, + action="invite", + ratelimit=False, + ) + + # Send the join. + await room_member_handler.update_membership( + requester=create_requester(user_id), + target=UserID.from_string(user_id), + room_id=room_id, + remote_room_hosts=remote_room_hosts, + action="join", + ratelimit=False, + ) + except ConsentNotGivenError as e: # Technically not necessary to pull out this error though # moving away from bare excepts is a good thing to do. @@ -322,6 +429,29 @@ class RegistrationHandler(BaseHandler): except Exception as e: logger.error("Failed to join new user to %r: %r", r, e) + async def _auto_join_rooms(self, user_id: str): + """Automatically joins users to auto join rooms - creating the room in the first place + if the user is the first to be created. + + Args: + user_id: The user to join + """ + # auto-join the user to any rooms we're supposed to dump them into + + # try to create the room if we're the first real user on the server. Note + # that an auto-generated support or bot user is not a real user and will never be + # the user to create the room + should_auto_create_rooms = False + is_real_user = await self.store.is_real_user(user_id) + if self.hs.config.registration.autocreate_auto_join_rooms and is_real_user: + count = await self.store.count_real_users() + should_auto_create_rooms = count == 1 + + if should_auto_create_rooms: + await self._create_and_join_rooms(user_id) + else: + await self._join_rooms(user_id) + async def post_consent_actions(self, user_id): """A series of registration actions that can only be carried out once consent has been granted @@ -392,30 +522,6 @@ class RegistrationHandler(BaseHandler): self._next_generated_user_id += 1 return str(id) - async def _join_user_to_room(self, requester, room_identifier): - room_member_handler = self.hs.get_room_member_handler() - if RoomID.is_valid(room_identifier): - room_id = room_identifier - elif RoomAlias.is_valid(room_identifier): - room_alias = RoomAlias.from_string(room_identifier) - room_id, remote_room_hosts = await room_member_handler.lookup_room_alias( - room_alias - ) - room_id = room_id.to_string() - else: - raise SynapseError( - 400, "%s was not legal room ID or room alias" % (room_identifier,) - ) - - await room_member_handler.update_membership( - requester=requester, - target=requester.user, - room_id=room_id, - remote_room_hosts=remote_room_hosts, - action="join", - ratelimit=False, - ) - def check_registration_ratelimit(self, address): """A simple helper method to check whether the registration rate limit has been hit for a given IP address diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 61db3ccc43..950a84acd0 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -24,9 +24,12 @@ import string from collections import OrderedDict from typing import Tuple -from six import iteritems, string_types - -from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset +from synapse.api.constants import ( + EventTypes, + JoinRules, + RoomCreationPreset, + RoomEncryptionAlgorithms, +) from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion from synapse.events.utils import copy_power_levels_contents @@ -56,31 +59,6 @@ FIVE_MINUTES_IN_MS = 5 * 60 * 1000 class RoomCreationHandler(BaseHandler): - - PRESETS_DICT = { - RoomCreationPreset.PRIVATE_CHAT: { - "join_rules": JoinRules.INVITE, - "history_visibility": "shared", - "original_invitees_have_ops": False, - "guest_can_join": True, - "power_level_content_override": {"invite": 0}, - }, - RoomCreationPreset.TRUSTED_PRIVATE_CHAT: { - "join_rules": JoinRules.INVITE, - "history_visibility": "shared", - "original_invitees_have_ops": True, - "guest_can_join": True, - "power_level_content_override": {"invite": 0}, - }, - RoomCreationPreset.PUBLIC_CHAT: { - "join_rules": JoinRules.PUBLIC, - "history_visibility": "shared", - "original_invitees_have_ops": False, - "guest_can_join": False, - "power_level_content_override": {}, - }, - } - def __init__(self, hs): super(RoomCreationHandler, self).__init__(hs) @@ -89,6 +67,39 @@ class RoomCreationHandler(BaseHandler): self.room_member_handler = hs.get_room_member_handler() self.config = hs.config + # Room state based off defined presets + self._presets_dict = { + RoomCreationPreset.PRIVATE_CHAT: { + "join_rules": JoinRules.INVITE, + "history_visibility": "shared", + "original_invitees_have_ops": False, + "guest_can_join": True, + "power_level_content_override": {"invite": 0}, + }, + RoomCreationPreset.TRUSTED_PRIVATE_CHAT: { + "join_rules": JoinRules.INVITE, + "history_visibility": "shared", + "original_invitees_have_ops": True, + "guest_can_join": True, + "power_level_content_override": {"invite": 0}, + }, + RoomCreationPreset.PUBLIC_CHAT: { + "join_rules": JoinRules.PUBLIC, + "history_visibility": "shared", + "original_invitees_have_ops": False, + "guest_can_join": False, + "power_level_content_override": {}, + }, + } + + # Modify presets to selectively enable encryption by default per homeserver config + for preset_name, preset_config in self._presets_dict.items(): + encrypted = ( + preset_name + in self.config.encryption_enabled_by_default_for_room_presets + ) + preset_config["encrypted"] = encrypted + self._replication = hs.get_replication_data_handler() # linearizer to stop two upgrades happening at once @@ -364,7 +375,7 @@ class RoomCreationHandler(BaseHandler): # map from event_id to BaseEvent old_room_state_events = await self.store.get_events(old_room_state_ids.values()) - for k, old_event_id in iteritems(old_room_state_ids): + for k, old_event_id in old_room_state_ids.items(): old_event = old_room_state_events.get(old_event_id) if old_event: initial_state[k] = old_event.content @@ -417,7 +428,7 @@ class RoomCreationHandler(BaseHandler): old_room_member_state_events = await self.store.get_events( old_room_member_state_ids.values() ) - for k, old_event in iteritems(old_room_member_state_events): + for k, old_event in old_room_member_state_events.items(): # Only transfer ban events if ( "membership" in old_event.content @@ -582,7 +593,7 @@ class RoomCreationHandler(BaseHandler): "room_version", self.config.default_room_version.identifier ) - if not isinstance(room_version_id, string_types): + if not isinstance(room_version_id, str): raise SynapseError(400, "room_version must be a string", Codes.BAD_JSON) room_version = KNOWN_ROOM_VERSIONS.get(room_version_id) @@ -798,7 +809,7 @@ class RoomCreationHandler(BaseHandler): ) return last_stream_id - config = RoomCreationHandler.PRESETS_DICT[preset_config] + config = self._presets_dict[preset_config] creator_id = creator.user.to_string() @@ -888,6 +899,13 @@ class RoomCreationHandler(BaseHandler): etype=etype, state_key=state_key, content=content ) + if config["encrypted"]: + last_sent_stream_id = await send( + etype=EventTypes.RoomEncryption, + state_key="", + content={"algorithm": RoomEncryptionAlgorithms.DEFAULT}, + ) + return last_sent_stream_id async def _generate_room_id( diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 4cbc02b0d0..5e05be6181 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -17,8 +17,6 @@ import logging from collections import namedtuple from typing import Any, Dict, Optional -from six import iteritems - import msgpack from unpaddedbase64 import decode_base64, encode_base64 @@ -271,7 +269,7 @@ class RoomListHandler(BaseHandler): event_map = yield self.store.get_events( [ event_id - for key, event_id in iteritems(current_state_ids) + for key, event_id in current_state_ids.items() if key[0] in ( EventTypes.Create, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 0f7af982f0..27c479da9e 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -17,10 +17,9 @@ import abc import logging +from http import HTTPStatus from typing import Dict, Iterable, List, Optional, Tuple -from six.moves import http_client - from synapse import types from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, SynapseError @@ -361,7 +360,7 @@ class RoomMemberHandler(object): if effective_membership_state == Membership.INVITE: # block any attempts to invite the server notices mxid if target.to_string() == self._server_notices_mxid: - raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user") + raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user") block_invite = False @@ -444,7 +443,7 @@ class RoomMemberHandler(object): is_blocked = await self._is_server_notice_room(room_id) if is_blocked: raise SynapseError( - http_client.FORBIDDEN, + HTTPStatus.FORBIDDEN, "You cannot reject this invite", errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM, ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6bdb24baff..4c7524493e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -18,8 +18,6 @@ import itertools import logging from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple -from six import iteritems, itervalues - import attr from prometheus_client import Counter @@ -390,7 +388,7 @@ class SyncHandler(object): # result returned by the event source is poor form (it might cache # the object) room_id = event["room_id"] - event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"} + event_copy = {k: v for (k, v) in event.items() if k != "room_id"} ephemeral_by_room.setdefault(room_id, []).append(event_copy) receipt_key = since_token.receipt_key if since_token else "0" @@ -408,7 +406,7 @@ class SyncHandler(object): for event in receipts: room_id = event["room_id"] # exclude room id, as above - event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"} + event_copy = {k: v for (k, v) in event.items() if k != "room_id"} ephemeral_by_room.setdefault(room_id, []).append(event_copy) return now_token, ephemeral_by_room @@ -454,7 +452,7 @@ class SyncHandler(object): current_state_ids_map = await self.state.get_current_state_ids( room_id ) - current_state_ids = frozenset(itervalues(current_state_ids_map)) + current_state_ids = frozenset(current_state_ids_map.values()) recents = await filter_events_for_client( self.storage, @@ -509,7 +507,7 @@ class SyncHandler(object): current_state_ids_map = await self.state.get_current_state_ids( room_id ) - current_state_ids = frozenset(itervalues(current_state_ids_map)) + current_state_ids = frozenset(current_state_ids_map.values()) loaded_recents = await filter_events_for_client( self.storage, @@ -909,7 +907,7 @@ class SyncHandler(object): logger.debug("filtering state from %r...", state_ids) state_ids = { t: event_id - for t, event_id in iteritems(state_ids) + for t, event_id in state_ids.items() if cache.get(t[1]) != event_id } logger.debug("...to %r", state_ids) @@ -1430,7 +1428,7 @@ class SyncHandler(object): if since_token: for joined_sync in sync_result_builder.joined: it = itertools.chain( - joined_sync.timeline.events, itervalues(joined_sync.state) + joined_sync.timeline.events, joined_sync.state.values() ) for event in it: if event.type == EventTypes.Member: @@ -1505,7 +1503,7 @@ class SyncHandler(object): newly_left_rooms = [] room_entries = [] invited = [] - for room_id, events in iteritems(mem_change_events_by_room_id): + for room_id, events in mem_change_events_by_room_id.items(): logger.debug( "Membership changes in %s: [%s]", room_id, @@ -1993,17 +1991,17 @@ def _calculate_state( event_id_to_key = { e: key for key, e in itertools.chain( - iteritems(timeline_contains), - iteritems(previous), - iteritems(timeline_start), - iteritems(current), + timeline_contains.items(), + previous.items(), + timeline_start.items(), + current.items(), ) } - c_ids = set(itervalues(current)) - ts_ids = set(itervalues(timeline_start)) - p_ids = set(itervalues(previous)) - tc_ids = set(itervalues(timeline_contains)) + c_ids = set(current.values()) + ts_ids = set(timeline_start.values()) + p_ids = set(previous.values()) + tc_ids = set(timeline_contains.values()) # If we are lazyloading room members, we explicitly add the membership events # for the senders in the timeline into the state block returned by /sync, @@ -2017,7 +2015,7 @@ def _calculate_state( if lazy_load_members: p_ids.difference_update( - e for t, e in iteritems(timeline_start) if t[0] == EventTypes.Member + e for t, e in timeline_start.items() if t[0] == EventTypes.Member ) state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index c7bc14c623..6c7abaa578 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -15,9 +15,7 @@ import logging from collections import namedtuple -from typing import List - -from twisted.internet import defer +from typing import List, Tuple from synapse.api.errors import AuthError, SynapseError from synapse.logging.context import run_in_background @@ -115,8 +113,7 @@ class TypingHandler(object): def is_typing(self, member): return member.user_id in self._room_typing.get(member.room_id, []) - @defer.inlineCallbacks - def started_typing(self, target_user, auth_user, room_id, timeout): + async def started_typing(self, target_user, auth_user, room_id, timeout): target_user_id = target_user.to_string() auth_user_id = auth_user.to_string() @@ -126,7 +123,7 @@ class TypingHandler(object): if target_user_id != auth_user_id: raise AuthError(400, "Cannot set another user's typing state") - yield self.auth.check_user_in_room(room_id, target_user_id) + await self.auth.check_user_in_room(room_id, target_user_id) logger.debug("%s has started typing in %s", target_user_id, room_id) @@ -145,8 +142,7 @@ class TypingHandler(object): self._push_update(member=member, typing=True) - @defer.inlineCallbacks - def stopped_typing(self, target_user, auth_user, room_id): + async def stopped_typing(self, target_user, auth_user, room_id): target_user_id = target_user.to_string() auth_user_id = auth_user.to_string() @@ -156,7 +152,7 @@ class TypingHandler(object): if target_user_id != auth_user_id: raise AuthError(400, "Cannot set another user's typing state") - yield self.auth.check_user_in_room(room_id, target_user_id) + await self.auth.check_user_in_room(room_id, target_user_id) logger.debug("%s has stopped typing in %s", target_user_id, room_id) @@ -164,12 +160,11 @@ class TypingHandler(object): self._stopped_typing(member) - @defer.inlineCallbacks def user_left_room(self, user, room_id): user_id = user.to_string() if self.is_mine_id(user_id): member = RoomMember(room_id=room_id, user_id=user_id) - yield self._stopped_typing(member) + self._stopped_typing(member) def _stopped_typing(self, member): if member.user_id not in self._room_typing.get(member.room_id, set()): @@ -188,10 +183,9 @@ class TypingHandler(object): self._push_update_local(member=member, typing=typing) - @defer.inlineCallbacks - def _push_remote(self, member, typing): + async def _push_remote(self, member, typing): try: - users = yield self.state.get_current_users_in_room(member.room_id) + users = await self.state.get_current_users_in_room(member.room_id) self._member_last_federation_poke[member] = self.clock.time_msec() now = self.clock.time_msec() @@ -215,8 +209,7 @@ class TypingHandler(object): except Exception: logger.exception("Error pushing typing notif to remotes") - @defer.inlineCallbacks - def _recv_edu(self, origin, content): + async def _recv_edu(self, origin, content): room_id = content["room_id"] user_id = content["user_id"] @@ -231,7 +224,7 @@ class TypingHandler(object): ) return - users = yield self.state.get_current_users_in_room(room_id) + users = await self.state.get_current_users_in_room(room_id) domains = {get_domain_from_id(u) for u in users} if self.server_name in domains: @@ -259,14 +252,31 @@ class TypingHandler(object): ) async def get_all_typing_updates( - self, last_id: int, current_id: int, limit: int - ) -> List[dict]: - """Get up to `limit` typing updates between the given tokens, earliest - updates first. + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, list]], int, bool]: + """Get updates for typing replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data """ if last_id == current_id: - return [] + return [], current_id, False changed_rooms = self._typing_stream_change_cache.get_all_entities_changed( last_id @@ -280,9 +290,16 @@ class TypingHandler(object): serial = self._room_serials[room_id] if last_id < serial <= current_id: typing = self._room_typing[room_id] - rows.append((serial, room_id, list(typing))) + rows.append((serial, [room_id, list(typing)])) rows.sort() - return rows[:limit] + + limited = False + if len(rows) > limit: + rows = rows[:limit] + current_id = rows[-1][0] + limited = True + + return rows, current_id, limited def get_current_token(self): return self._latest_room_serial @@ -306,7 +323,7 @@ class TypingNotificationEventSource(object): "content": {"user_ids": list(typing)}, } - def get_new_events(self, from_key, room_ids, **kwargs): + async def get_new_events(self, from_key, room_ids, **kwargs): with Measure(self.clock, "typing.get_new_events"): from_key = int(from_key) handler = self.get_typing_handler() @@ -320,7 +337,7 @@ class TypingNotificationEventSource(object): events.append(self._make_event_for(room_id)) - return defer.succeed((events, handler._latest_room_serial)) + return (events, handler._latest_room_serial) def get_current_key(self): return self.get_typing_handler()._latest_room_serial diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 12423b909a..521b6d620d 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -15,8 +15,6 @@ import logging -from six import iteritems, iterkeys - import synapse.metrics from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.handlers.state_deltas import StateDeltasHandler @@ -289,7 +287,7 @@ class UserDirectoryHandler(StateDeltasHandler): users_with_profile = await self.state.get_current_users_in_room(room_id) # Remove every user from the sharing tables for that room. - for user_id in iterkeys(users_with_profile): + for user_id in users_with_profile.keys(): await self.store.remove_user_who_share_room(user_id, room_id) # Then, re-add them to the tables. @@ -298,7 +296,7 @@ class UserDirectoryHandler(StateDeltasHandler): # which when ran over an entire room, will result in the same values # being added multiple times. The batching upserts shouldn't make this # too bad, though. - for user_id, profile in iteritems(users_with_profile): + for user_id, profile in users_with_profile.items(): await self._handle_new_user(room_id, user_id, profile) async def _handle_new_user(self, room_id, user_id, profile): diff --git a/synapse/http/client.py b/synapse/http/client.py index 3cef747a4d..8743e9839d 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -15,11 +15,9 @@ # limitations under the License. import logging +import urllib from io import BytesIO -from six import raise_from, text_type -from six.moves import urllib - import treq from canonicaljson import encode_canonical_json, json from netaddr import IPAddress @@ -577,7 +575,7 @@ class SimpleHttpClient(object): # This can happen e.g. because the body is too large. raise except Exception as e: - raise_from(SynapseError(502, ("Failed to download remote body: %s" % e)), e) + raise SynapseError(502, ("Failed to download remote body: %s" % e)) from e return ( length, @@ -638,7 +636,7 @@ def encode_urlencode_args(args): def encode_urlencode_arg(arg): - if isinstance(arg, text_type): + if isinstance(arg, str): return arg.encode("utf-8") elif isinstance(arg, list): return [encode_urlencode_arg(i) for i in arg] diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index f5f917f5ae..c5fc746f2f 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -48,6 +48,9 @@ class MatrixFederationAgent(object): tls_client_options_factory (FederationPolicyForHTTPS|None): factory to use for fetching client tls options, or none to disable TLS. + user_agent (bytes): + The user agent header to use for federation requests. + _srv_resolver (SrvResolver|None): SRVResolver impl to use for looking up SRV records. None to use a default implementation. @@ -61,6 +64,7 @@ class MatrixFederationAgent(object): self, reactor, tls_client_options_factory, + user_agent, _srv_resolver=None, _well_known_resolver=None, ): @@ -78,6 +82,7 @@ class MatrixFederationAgent(object): ), pool=self._pool, ) + self.user_agent = user_agent if _well_known_resolver is None: _well_known_resolver = WellKnownResolver( @@ -87,6 +92,7 @@ class MatrixFederationAgent(object): pool=self._pool, contextFactory=tls_client_options_factory, ), + user_agent=self.user_agent, ) self._well_known_resolver = _well_known_resolver @@ -149,7 +155,7 @@ class MatrixFederationAgent(object): parsed_uri = urllib.parse.urlparse(uri) # We need to make sure the host header is set to the netloc of the - # server. + # server and that a user-agent is provided. if headers is None: headers = Headers() else: @@ -157,6 +163,8 @@ class MatrixFederationAgent(object): if not headers.hasHeader(b"host"): headers.addRawHeader(b"host", parsed_uri.netloc) + if not headers.hasHeader(b"user-agent"): + headers.addRawHeader(b"user-agent", self.user_agent) res = yield make_deferred_yieldable( self._agent.request(method, uri, headers, bodyProducer) diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py index 7ddfad286d..89a3b041ce 100644 --- a/synapse/http/federation/well_known_resolver.py +++ b/synapse/http/federation/well_known_resolver.py @@ -23,6 +23,7 @@ import attr from twisted.internet import defer from twisted.web.client import RedirectAgent, readBody from twisted.web.http import stringToDatetime +from twisted.web.http_headers import Headers from synapse.logging.context import make_deferred_yieldable from synapse.util import Clock @@ -78,7 +79,12 @@ class WellKnownResolver(object): """ def __init__( - self, reactor, agent, well_known_cache=None, had_well_known_cache=None + self, + reactor, + agent, + user_agent, + well_known_cache=None, + had_well_known_cache=None, ): self._reactor = reactor self._clock = Clock(reactor) @@ -92,6 +98,7 @@ class WellKnownResolver(object): self._well_known_cache = well_known_cache self._had_valid_well_known_cache = had_well_known_cache self._well_known_agent = RedirectAgent(agent) + self.user_agent = user_agent @defer.inlineCallbacks def get_well_known(self, server_name): @@ -227,6 +234,10 @@ class WellKnownResolver(object): uri = b"https://%s/.well-known/matrix/server" % (server_name,) uri_str = uri.decode("ascii") + headers = { + b"User-Agent": [self.user_agent], + } + i = 0 while True: i += 1 @@ -234,7 +245,9 @@ class WellKnownResolver(object): logger.info("Fetching %s", uri_str) try: response = yield make_deferred_yieldable( - self._well_known_agent.request(b"GET", uri) + self._well_known_agent.request( + b"GET", uri, headers=Headers(headers) + ) ) body = yield make_deferred_yieldable(readBody(response)) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 2d47b9ea00..18f6a8fd29 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -17,11 +17,9 @@ import cgi import logging import random import sys +import urllib from io import BytesIO -from six import raise_from, string_types -from six.moves import urllib - import attr import treq from canonicaljson import encode_canonical_json @@ -199,7 +197,14 @@ class MatrixFederationHttpClient(object): self.reactor = Reactor() - self.agent = MatrixFederationAgent(self.reactor, tls_client_options_factory) + user_agent = hs.version_string + if hs.config.user_agent_suffix: + user_agent = "%s %s" % (user_agent, hs.config.user_agent_suffix) + user_agent = user_agent.encode("ascii") + + self.agent = MatrixFederationAgent( + self.reactor, tls_client_options_factory, user_agent + ) # Use a BlacklistingAgentWrapper to prevent circumventing the IP # blacklist via IP literals in server names @@ -432,10 +437,10 @@ class MatrixFederationHttpClient(object): except TimeoutError as e: raise RequestSendFailed(e, can_retry=True) from e except DNSLookupError as e: - raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e) + raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e except Exception as e: logger.info("Failed to send request: %s", e) - raise_from(RequestSendFailed(e, can_retry=True), e) + raise RequestSendFailed(e, can_retry=True) from e incoming_responses_counter.labels( request.method, response.code @@ -487,7 +492,7 @@ class MatrixFederationHttpClient(object): # Retry if the error is a 429 (Too Many Requests), # otherwise just raise a standard HttpResponseException if response.code == 429: - raise_from(RequestSendFailed(e, can_retry=True), e) + raise RequestSendFailed(e, can_retry=True) from e else: raise e @@ -998,7 +1003,7 @@ def encode_query_args(args): encoded_args = {} for k, vs in args.items(): - if isinstance(vs, string_types): + if isinstance(vs, str): vs = [vs] encoded_args[k] = [v.encode("UTF-8") for v in vs] diff --git a/synapse/http/server.py b/synapse/http/server.py index 2331a2a4b0..d192de7923 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -16,10 +16,10 @@ import collections import html -import http.client import logging import types import urllib +from http import HTTPStatus from io import BytesIO from typing import Awaitable, Callable, TypeVar, Union @@ -188,7 +188,7 @@ def return_html_error( exc_info=(f.type, f.value, f.getTracebackObject()), ) else: - code = http.HTTPStatus.INTERNAL_SERVER_ERROR + code = HTTPStatus.INTERNAL_SERVER_ERROR msg = "Internal server error" logger.error( diff --git a/synapse/http/site.py b/synapse/http/site.py index 167293c46d..cbc37eac6e 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -19,6 +19,7 @@ from typing import Optional from twisted.python.failure import Failure from twisted.web.server import Request, Site +from synapse.config.server import ListenerConfig from synapse.http import redact_uri from synapse.http.request_metrics import RequestMetrics, requests_counter from synapse.logging.context import LoggingContext, PreserveLoggingContext @@ -350,7 +351,7 @@ class SynapseSite(Site): self, logger_name, site_tag, - config, + config: ListenerConfig, resource, server_version_string, *args, @@ -360,7 +361,8 @@ class SynapseSite(Site): self.site_tag = site_tag - proxied = config.get("x_forwarded", False) + assert config.http_options is not None + proxied = config.http_options.x_forwarded self.requestFactory = XForwardedForRequest if proxied else SynapseRequest self.access_logger = logging.getLogger(logger_name) self.server_version_string = server_version_string.encode("ascii") diff --git a/synapse/logging/formatter.py b/synapse/logging/formatter.py index fbf570c756..d736ad5b9b 100644 --- a/synapse/logging/formatter.py +++ b/synapse/logging/formatter.py @@ -16,8 +16,7 @@ import logging import traceback - -from six import StringIO +from io import StringIO class LogFormatter(logging.Formatter): diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 5dddf57008..73bef5e5ca 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -171,8 +171,9 @@ import logging import re import types from functools import wraps -from typing import TYPE_CHECKING, Dict +from typing import TYPE_CHECKING, Dict, Optional, Type +import attr from canonicaljson import json from twisted.internet import defer @@ -232,6 +233,30 @@ except ImportError: LogContextScopeManager = None # type: ignore +try: + from rust_python_jaeger_reporter import Reporter + + @attr.s(slots=True, frozen=True) + class _WrappedRustReporter: + """Wrap the reporter to ensure `report_span` never throws. + """ + + _reporter = attr.ib(type=Reporter, default=attr.Factory(Reporter)) + + def set_process(self, *args, **kwargs): + return self._reporter.set_process(*args, **kwargs) + + def report_span(self, span): + try: + return self._reporter.report_span(span) + except Exception: + logger.exception("Failed to report span") + + RustReporter = _WrappedRustReporter # type: Optional[Type[_WrappedRustReporter]] +except ImportError: + RustReporter = None + + logger = logging.getLogger(__name__) @@ -320,11 +345,19 @@ def init_tracer(hs: "HomeServer"): set_homeserver_whitelist(hs.config.opentracer_whitelist) - JaegerConfig( + config = JaegerConfig( config=hs.config.jaeger_config, service_name="{} {}".format(hs.config.server_name, hs.get_instance_name()), scope_manager=LogContextScopeManager(hs.config), - ).initialize_tracer() + ) + + # If we have the rust jaeger reporter available let's use that. + if RustReporter: + logger.info("Using rust_python_jaeger_reporter library") + tracer = config.create_tracer(RustReporter(), config.sampler) + opentracing.set_global_tracer(tracer) + else: + config.initialize_tracer() # Whitelisting diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 9cf31f96b3..6035672698 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -22,8 +22,6 @@ import threading import time from typing import Callable, Dict, Iterable, Optional, Tuple, Union -import six - import attr from prometheus_client import Counter, Gauge, Histogram from prometheus_client.core import ( @@ -83,7 +81,7 @@ class LaterGauge(object): return if isinstance(calls, dict): - for k, v in six.iteritems(calls): + for k, v in calls.items(): g.add_metric(k, v) else: g.add_metric([], calls) @@ -194,7 +192,7 @@ class InFlightGauge(object): gauge = GaugeMetricFamily( "_".join([self.name, name]), "", labels=self.labels ) - for key, metrics in six.iteritems(metrics_by_key): + for key, metrics in metrics_by_key.items(): gauge.add_metric(key, getattr(metrics, name)) yield gauge @@ -465,6 +463,12 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name" # finished being processed. event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"]) +event_processing_lag_by_event = Histogram( + "synapse_event_processing_lag_by_event", + "Time between an event being persisted and it being queued up to be sent to the relevant remote servers", + ["name"], +) + # Build info of the running server. build_info = Gauge( "synapse_build_info", "Build information", ["pythonversion", "version", "osversion"] diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py index ab7f948ed4..4304c60d56 100644 --- a/synapse/metrics/_exposition.py +++ b/synapse/metrics/_exposition.py @@ -208,6 +208,7 @@ class MetricsHandler(BaseHTTPRequestHandler): raise self.send_response(200) self.send_header("Content-Type", CONTENT_TYPE_LATEST) + self.send_header("Content-Length", str(len(output))) self.end_headers() self.wfile.write(output) @@ -261,4 +262,6 @@ class MetricsResource(Resource): def render_GET(self, request): request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode("ascii")) - return generate_latest(self.registry) + response = generate_latest(self.registry) + request.setHeader(b"Content-Length", str(len(response))) + return response diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index e75d964ac8..43ffe6faf0 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -17,8 +17,6 @@ import logging from collections import namedtuple -from six import iteritems, itervalues - from prometheus_client import Counter from twisted.internet import defer @@ -130,7 +128,7 @@ class BulkPushRuleEvaluator(object): event, prev_state_ids, for_verification=False ) auth_events = yield self.store.get_events(auth_events_ids) - auth_events = {(e.type, e.state_key): e for e in itervalues(auth_events)} + auth_events = {(e.type, e.state_key): e for e in auth_events.values()} sender_level = get_user_power_level(event.sender, auth_events) @@ -162,7 +160,7 @@ class BulkPushRuleEvaluator(object): condition_cache = {} - for uid, rules in iteritems(rules_by_user): + for uid, rules in rules_by_user.items(): if event.sender == uid: continue @@ -395,7 +393,7 @@ class RulesForRoom(object): # If the event is a join event then it will be in current state evnts # map but not in the DB, so we have to explicitly insert it. if event.type == EventTypes.Member: - for event_id in itervalues(member_event_ids): + for event_id in member_event_ids.values(): if event_id == event.event_id: members[event_id] = (event.state_key, event.membership) @@ -404,7 +402,7 @@ class RulesForRoom(object): interested_in_user_ids = { user_id - for user_id, membership in itervalues(members) + for user_id, membership in members.values() if membership == Membership.JOIN } @@ -415,7 +413,7 @@ class RulesForRoom(object): ) user_ids = { - uid for uid, have_pusher in iteritems(if_users_with_pushers) if have_pusher + uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher } logger.debug("With pushers: %r", user_ids) @@ -436,7 +434,7 @@ class RulesForRoom(object): ) ret_rules_by_user.update( - item for item in iteritems(rules_by_user) if item[0] is not None + item for item in rules_by_user.items() if item[0] is not None ) self.update_cache(sequence, members, ret_rules_by_user, state_group) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index eaaa7afc91..2fac07593b 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -20,6 +20,7 @@ from prometheus_client import Counter from twisted.internet import defer from twisted.internet.error import AlreadyCalled, AlreadyCancelled +from synapse.api.constants import EventTypes from synapse.logging import opentracing from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import PusherConfigException @@ -129,6 +130,8 @@ class HttpPusher(object): @defer.inlineCallbacks def _update_badge(self): + # XXX as per https://github.com/matrix-org/matrix-doc/issues/2627, this seems + # to be largely redundant. perhaps we can remove it. badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id) yield self._send_badge(badge) @@ -303,12 +306,23 @@ class HttpPusher(object): @defer.inlineCallbacks def _build_notification_dict(self, event, tweaks, badge): + priority = "low" + if ( + event.type == EventTypes.Encrypted + or tweaks.get("highlight") + or tweaks.get("sound") + ): + # HACK send our push as high priority only if it generates a sound, highlight + # or may do so (i.e. is encrypted so has unknown effects). + priority = "high" + if self.data.get("format") == "event_id_only": d = { "notification": { "event_id": event.event_id, "room_id": event.room_id, "counts": {"unread": badge}, + "prio": priority, "devices": [ { "app_id": self.app_id, @@ -332,9 +346,8 @@ class HttpPusher(object): "room_id": event.room_id, "type": event.type, "sender": event.user_id, - "counts": { # -- we don't mark messages as read yet so - # we have no way of knowing - # Just set the badge to 1 until we have read receipts + "prio": priority, + "counts": { "unread": badge, # 'missed_calls': 2 }, diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index d57a66a697..dda560b2c2 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -17,12 +17,11 @@ import email.mime.multipart import email.utils import logging import time +import urllib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from typing import Iterable, List, TypeVar -from six.moves import urllib - import bleach import jinja2 diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 11032491af..8e0d3a416d 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -18,8 +18,6 @@ import logging import re from typing import Pattern -from six import string_types - from synapse.events import EventBase from synapse.types import UserID from synapse.util.caches import register_cache @@ -131,7 +129,7 @@ class PushRuleEvaluatorForEvent(object): # XXX: optimisation: cache our pattern regexps if condition["key"] == "content.body": body = self._event.content.get("body", None) - if not body: + if not body or not isinstance(body, str): return False return _glob_matches(pattern, body, word_boundary=True) @@ -147,7 +145,7 @@ class PushRuleEvaluatorForEvent(object): return False body = self._event.content.get("body", None) - if not body: + if not body or not isinstance(body, str): return False # Similar to _glob_matches, but do not treat display_name as a glob. @@ -244,7 +242,7 @@ def _flatten_dict(d, prefix=[], result=None): if result is None: result = {} for key, value in d.items(): - if isinstance(value, string_types): + if isinstance(value, str): result[".".join(prefix + [key])] = value.lower() elif hasattr(value, "items"): _flatten_dict(value, prefix=(prefix + [key]), result=result) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 88d203aa44..f6a5458681 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -215,11 +215,9 @@ class PusherPool: try: # Need to subtract 1 from the minimum because the lower bound here # is not inclusive - updated_receipts = yield self.store.get_all_updated_receipts( + users_affected = yield self.store.get_users_sent_receipts_between( min_stream_id - 1, max_stream_id ) - # This returns a tuple, user_id is at index 3 - users_affected = {r[3] for r in updated_receipts} for u in users_affected: if u in self.pushers: diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 8b4312e5a3..b1cac901eb 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -66,11 +66,9 @@ REQUIREMENTS = [ "pymacaroons>=0.13.0", "msgpack>=0.5.2", "phonenumbers>=8.2.0", - "six>=1.10", "prometheus_client>=0.0.18,<0.8.0", - # we use attr.s(slots), which arrived in 16.0.0 - # Twisted 18.7.0 requires attrs>=17.4.0 - "attrs>=17.4.0", + # we use attr.validators.deep_iterable, which arrived in 19.1.0 + "attrs>=19.1.0", "netaddr>=0.7.18", "Jinja2>=2.9", "bleach>=1.4.3", @@ -95,7 +93,12 @@ CONDITIONAL_REQUIREMENTS = { "oidc": ["authlib>=0.14.0"], "systemd": ["systemd-python>=231"], "url_preview": ["lxml>=3.5.0"], - "test": ["mock>=2.0", "parameterized"], + # Dependencies which are exclusively required by unit test code. This is + # NOT a list of all modules that are necessary to run the unit tests. + # Tests assume that all optional dependencies are installed. + # + # parameterized_class decorator was introduced in parameterized 0.7.0 + "test": ["mock>=2.0", "parameterized>=0.7.0"], "sentry": ["sentry-sdk>=0.7.2"], "opentracing": ["jaeger-client>=4.0.0", "opentracing>=2.2.0"], "jwt": ["pyjwt>=1.6.4"], diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index 793cef6c26..9caf1e80c1 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -16,12 +16,10 @@ import abc import logging import re +import urllib from inspect import signature from typing import Dict, List, Tuple -from six import raise_from -from six.moves import urllib - from twisted.internet import defer from synapse.api.errors import ( @@ -220,7 +218,7 @@ class ReplicationEndpoint(object): # importantly, not stack traces everywhere) raise e.to_synapse_error() except RequestSendFailed as e: - raise_from(SynapseError(502, "Failed to talk to master"), e) + raise SynapseError(502, "Failed to talk to master") from e return result diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py index 9db6c62bc7..525b94fd87 100644 --- a/synapse/replication/slave/storage/account_data.py +++ b/synapse/replication/slave/storage/account_data.py @@ -16,6 +16,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker +from synapse.replication.tcp.streams import AccountDataStream, TagAccountDataStream from synapse.storage.data_stores.main.account_data import AccountDataWorkerStore from synapse.storage.data_stores.main.tags import TagsWorkerStore from synapse.storage.database import Database @@ -39,12 +40,12 @@ class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlaved return self._account_data_id_gen.get_current_token() def process_replication_rows(self, stream_name, instance_name, token, rows): - if stream_name == "tag_account_data": + if stream_name == TagAccountDataStream.NAME: self._account_data_id_gen.advance(token) for row in rows: self.get_tags_for_user.invalidate((row.user_id,)) self._account_data_stream_cache.entity_has_changed(row.user_id, token) - elif stream_name == "account_data": + elif stream_name == AccountDataStream.NAME: self._account_data_id_gen.advance(token) for row in rows: if not row.room_id: diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 6e7fd259d4..bd394f6b00 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -15,6 +15,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker +from synapse.replication.tcp.streams import ToDeviceStream from synapse.storage.data_stores.main.deviceinbox import DeviceInboxWorkerStore from synapse.storage.database import Database from synapse.util.caches.expiringcache import ExpiringCache @@ -44,7 +45,7 @@ class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore): ) def process_replication_rows(self, stream_name, instance_name, token, rows): - if stream_name == "to_device": + if stream_name == ToDeviceStream.NAME: self._device_inbox_id_gen.advance(token) for row in rows: if row.entity.startswith("@"): diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py index 1851e7d525..5d210fa3a1 100644 --- a/synapse/replication/slave/storage/groups.py +++ b/synapse/replication/slave/storage/groups.py @@ -15,6 +15,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker +from synapse.replication.tcp.streams import GroupServerStream from synapse.storage.data_stores.main.group_server import GroupServerWorkerStore from synapse.storage.database import Database from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -38,7 +39,7 @@ class SlavedGroupServerStore(GroupServerWorkerStore, BaseSlavedStore): return self._group_updates_id_gen.get_current_token() def process_replication_rows(self, stream_name, instance_name, token, rows): - if stream_name == "groups": + if stream_name == GroupServerStream.NAME: self._group_updates_id_gen.advance(token) for row in rows: self._group_updates_stream_cache.entity_has_changed(row.user_id, token) diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py index 4e0124842d..2938cb8e43 100644 --- a/synapse/replication/slave/storage/presence.py +++ b/synapse/replication/slave/storage/presence.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.replication.tcp.streams import PresenceStream from synapse.storage import DataStore from synapse.storage.data_stores.main.presence import PresenceStore from synapse.storage.database import Database @@ -42,7 +43,7 @@ class SlavedPresenceStore(BaseSlavedStore): return self._presence_id_gen.get_current_token() def process_replication_rows(self, stream_name, instance_name, token, rows): - if stream_name == "presence": + if stream_name == PresenceStream.NAME: self._presence_id_gen.advance(token) for row in rows: self.presence_stream_cache.entity_has_changed(row.user_id, token) diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py index 6adb19463a..23ec1c5b11 100644 --- a/synapse/replication/slave/storage/push_rule.py +++ b/synapse/replication/slave/storage/push_rule.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.replication.tcp.streams import PushRulesStream from synapse.storage.data_stores.main.push_rule import PushRulesWorkerStore from .events import SlavedEventStore @@ -30,7 +31,7 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore): return self._push_rules_stream_id_gen.get_current_token() def process_replication_rows(self, stream_name, instance_name, token, rows): - if stream_name == "push_rules": + if stream_name == PushRulesStream.NAME: self._push_rules_stream_id_gen.advance(token) for row in rows: self.get_push_rules_for_user.invalidate((row.user_id,)) diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py index cb78b49acb..ff449f3658 100644 --- a/synapse/replication/slave/storage/pushers.py +++ b/synapse/replication/slave/storage/pushers.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.replication.tcp.streams import PushersStream from synapse.storage.data_stores.main.pusher import PusherWorkerStore from synapse.storage.database import Database @@ -32,6 +33,6 @@ class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore): return self._pushers_id_gen.get_current_token() def process_replication_rows(self, stream_name, instance_name, token, rows): - if stream_name == "pushers": + if stream_name == PushersStream.NAME: self._pushers_id_gen.advance(token) return super().process_replication_rows(stream_name, instance_name, token, rows) diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py index be716cc558..6982686eb5 100644 --- a/synapse/replication/slave/storage/receipts.py +++ b/synapse/replication/slave/storage/receipts.py @@ -14,20 +14,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.replication.tcp.streams import ReceiptsStream from synapse.storage.data_stores.main.receipts import ReceiptsWorkerStore from synapse.storage.database import Database from ._base import BaseSlavedStore from ._slaved_id_tracker import SlavedIdTracker -# So, um, we want to borrow a load of functions intended for reading from -# a DataStore, but we don't want to take functions that either write to the -# DataStore or are cached and don't have cache invalidation logic. -# -# Rather than write duplicate versions of those functions, or lift them to -# a common base class, we going to grab the underlying __func__ object from -# the method descriptor on the DataStore and chuck them into our class. - class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore): def __init__(self, database: Database, db_conn, hs): @@ -52,7 +45,7 @@ class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore): self.get_receipts_for_room.invalidate((room_id, receipt_type)) def process_replication_rows(self, stream_name, instance_name, token, rows): - if stream_name == "receipts": + if stream_name == ReceiptsStream.NAME: self._receipts_id_gen.advance(token) for row in rows: self.invalidate_caches_for_receipt( diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py index 8873bf37e5..8710207ada 100644 --- a/synapse/replication/slave/storage/room.py +++ b/synapse/replication/slave/storage/room.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.replication.tcp.streams import PublicRoomsStream from synapse.storage.data_stores.main.room import RoomWorkerStore from synapse.storage.database import Database @@ -31,7 +32,7 @@ class RoomStore(RoomWorkerStore, BaseSlavedStore): return self._public_room_id_gen.get_current_token() def process_replication_rows(self, stream_name, instance_name, token, rows): - if stream_name == "public_rooms": + if stream_name == PublicRoomsStream.NAME: self._public_room_id_gen.advance(token) return super().process_replication_rows(stream_name, instance_name, token, rows) diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index c04f622816..ea5937a20c 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -149,7 +149,7 @@ class RdataCommand(Command): class PositionCommand(Command): - """Sent by the server to tell the client the stream postition without + """Sent by the server to tell the client the stream position without needing to send an RDATA. Format:: @@ -188,7 +188,7 @@ class ErrorCommand(_SimpleCommand): class PingCommand(_SimpleCommand): - """Sent by either side as a keep alive. The data is arbitary (often timestamp) + """Sent by either side as a keep alive. The data is arbitrary (often timestamp) """ NAME = "PING" diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index cbcf46f3ae..e6a2e2598b 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -112,8 +112,8 @@ class ReplicationCommandHandler: "replication_position", clock=self._clock ) - # Map of stream to batched updates. See RdataCommand for info on how - # batching works. + # Map of stream name to batched updates. See RdataCommand for info on + # how batching works. self._pending_batches = {} # type: Dict[str, List[Any]] # The factory used to create connections. @@ -123,7 +123,8 @@ class ReplicationCommandHandler: # outgoing replication commands to.) self._connections = [] # type: List[AbstractConnection] - # For each connection, the incoming streams that are coming from that connection + # For each connection, the incoming stream names that are coming from + # that connection. self._streams_by_connection = {} # type: Dict[AbstractConnection, Set[str]] LaterGauge( @@ -310,7 +311,28 @@ class ReplicationCommandHandler: # Check if this is the last of a batch of updates rows = self._pending_batches.pop(stream_name, []) rows.append(row) - await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows) + + stream = self._streams.get(stream_name) + if not stream: + logger.error("Got RDATA for unknown stream: %s", stream_name) + return + + # Find where we previously streamed up to. + current_token = stream.current_token(cmd.instance_name) + + # Discard this data if this token is earlier than the current + # position. Note that streams can be reset (in which case you + # expect an earlier token), but that must be preceded by a + # POSITION command. + if cmd.token <= current_token: + logger.debug( + "Discarding RDATA from stream %s at position %s before previous position %s", + stream_name, + cmd.token, + current_token, + ) + else: + await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows) async def on_rdata( self, stream_name: str, instance_name: str, token: int, rows: list diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 4acefc8a96..f196eff072 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -264,7 +264,7 @@ class BackfillStream(Stream): super().__init__( hs.get_instance_name(), current_token_without_instance(store.get_current_backfill_token), - db_query_to_update_function(store.get_all_new_backfill_event_rows), + store.get_all_new_backfill_event_rows, ) @@ -291,9 +291,7 @@ class PresenceStream(Stream): if hs.config.worker_app is None: # on the master, query the presence handler presence_handler = hs.get_presence_handler() - update_function = db_query_to_update_function( - presence_handler.get_all_presence_updates - ) + update_function = presence_handler.get_all_presence_updates else: # Query master process update_function = make_http_update_function(hs, self.NAME) @@ -318,9 +316,7 @@ class TypingStream(Stream): if hs.config.worker_app is None: # on the master, query the typing handler - update_function = db_query_to_update_function( - typing_handler.get_all_typing_updates - ) + update_function = typing_handler.get_all_typing_updates else: # Query master process update_function = make_http_update_function(hs, self.NAME) @@ -352,7 +348,7 @@ class ReceiptsStream(Stream): super().__init__( hs.get_instance_name(), current_token_without_instance(store.get_max_receipt_stream_id), - db_query_to_update_function(store.get_all_updated_receipts), + store.get_all_updated_receipts, ) @@ -367,26 +363,17 @@ class PushRulesStream(Stream): def __init__(self, hs): self.store = hs.get_datastore() + super(PushRulesStream, self).__init__( - hs.get_instance_name(), self._current_token, self._update_function + hs.get_instance_name(), + self._current_token, + self.store.get_all_push_rule_updates, ) def _current_token(self, instance_name: str) -> int: push_rules_token, _ = self.store.get_push_rules_stream_token() return push_rules_token - async def _update_function( - self, instance_name: str, from_token: Token, to_token: Token, limit: int - ): - rows = await self.store.get_all_push_rule_updates(from_token, to_token, limit) - - limited = False - if len(rows) == limit: - to_token = rows[-1][0] - limited = True - - return [(row[0], (row[2],)) for row in rows], to_token, limited - class PushersStream(Stream): """A user has added/changed/removed a pusher diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 8173baef8f..e07c32118d 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -15,7 +15,7 @@ import logging from typing import List, Optional -from synapse.api.constants import EventTypes, JoinRules, Membership +from synapse.api.constants import EventTypes, JoinRules, Membership, RoomCreationPreset from synapse.api.errors import Codes, NotFoundError, SynapseError from synapse.http.servlet import ( RestServlet, @@ -77,7 +77,7 @@ class ShutdownRoomRestServlet(RestServlet): info, stream_id = await self._room_creation_handler.create_room( room_creator_requester, config={ - "preset": "public_chat", + "preset": RoomCreationPreset.PUBLIC_CHAT, "name": room_name, "power_level_content_override": {"users_default": -10}, }, diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index fefc8f71fa..e4330c39d6 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -16,9 +16,7 @@ import hashlib import hmac import logging import re - -from six import text_type -from six.moves import http_client +from http import HTTPStatus from synapse.api.constants import UserTypes from synapse.api.errors import Codes, NotFoundError, SynapseError @@ -215,10 +213,7 @@ class UserRestServletV2(RestServlet): await self.store.set_server_admin(target_user, set_admin_to) if "password" in body: - if ( - not isinstance(body["password"], text_type) - or len(body["password"]) > 512 - ): + if not isinstance(body["password"], str) or len(body["password"]) > 512: raise SynapseError(400, "Invalid password") else: new_password = body["password"] @@ -252,7 +247,7 @@ class UserRestServletV2(RestServlet): password = body.get("password") password_hash = None if password is not None: - if not isinstance(password, text_type) or len(password) > 512: + if not isinstance(password, str) or len(password) > 512: raise SynapseError(400, "Invalid password") password_hash = await self.auth_handler.hash(password) @@ -370,10 +365,7 @@ class UserRegisterServlet(RestServlet): 400, "username must be specified", errcode=Codes.BAD_JSON ) else: - if ( - not isinstance(body["username"], text_type) - or len(body["username"]) > 512 - ): + if not isinstance(body["username"], str) or len(body["username"]) > 512: raise SynapseError(400, "Invalid username") username = body["username"].encode("utf-8") @@ -386,7 +378,7 @@ class UserRegisterServlet(RestServlet): ) else: password = body["password"] - if not isinstance(password, text_type) or len(password) > 512: + if not isinstance(password, str) or len(password) > 512: raise SynapseError(400, "Invalid password") password_bytes = password.encode("utf-8") @@ -477,7 +469,7 @@ class DeactivateAccountRestServlet(RestServlet): erase = body.get("erase", False) if not isinstance(erase, bool): raise SynapseError( - http_client.BAD_REQUEST, + HTTPStatus.BAD_REQUEST, "Param 'erase' must be a boolean, if given", Codes.BAD_JSON, ) diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index dceb2792fa..bf0f9bd077 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -60,10 +60,18 @@ def login_id_thirdparty_from_phone(identifier): Returns: Login identifier dict of type 'm.id.threepid' """ - if "country" not in identifier or "number" not in identifier: + if "country" not in identifier or ( + # The specification requires a "phone" field, while Synapse used to require a "number" + # field. Accept both for backwards compatibility. + "phone" not in identifier + and "number" not in identifier + ): raise SynapseError(400, "Invalid phone-type identifier") - msisdn = phone_number_to_msisdn(identifier["country"], identifier["number"]) + # Accept both "phone" and "number" as valid keys in m.id.phone + phone_number = identifier.get("phone", identifier["number"]) + + msisdn = phone_number_to_msisdn(identifier["country"], phone_number) return {"type": "m.id.thirdparty", "medium": "msisdn", "address": msisdn} @@ -73,7 +81,8 @@ class LoginRestServlet(RestServlet): CAS_TYPE = "m.login.cas" SSO_TYPE = "m.login.sso" TOKEN_TYPE = "m.login.token" - JWT_TYPE = "m.login.jwt" + JWT_TYPE = "org.matrix.login.jwt" + JWT_TYPE_DEPRECATED = "m.login.jwt" def __init__(self, hs): super(LoginRestServlet, self).__init__() @@ -108,6 +117,7 @@ class LoginRestServlet(RestServlet): flows = [] if self.jwt_enabled: flows.append({"type": LoginRestServlet.JWT_TYPE}) + flows.append({"type": LoginRestServlet.JWT_TYPE_DEPRECATED}) if self.cas_enabled: # we advertise CAS for backwards compat, though MSC1721 renamed it @@ -141,6 +151,7 @@ class LoginRestServlet(RestServlet): try: if self.jwt_enabled and ( login_submission["type"] == LoginRestServlet.JWT_TYPE + or login_submission["type"] == LoginRestServlet.JWT_TYPE_DEPRECATED ): result = await self.do_jwt_login(login_submission) elif login_submission["type"] == LoginRestServlet.TOKEN_TYPE: diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py index eec16f8ad8..970fdd5834 100644 --- a/synapse/rest/client/v1/presence.py +++ b/synapse/rest/client/v1/presence.py @@ -17,8 +17,6 @@ """ import logging -from six import string_types - from synapse.api.errors import AuthError, SynapseError from synapse.handlers.presence import format_user_presence_state from synapse.http.servlet import RestServlet, parse_json_object_from_request @@ -51,7 +49,9 @@ class PresenceStatusRestServlet(RestServlet): raise AuthError(403, "You are not allowed to see their presence.") state = await self.presence_handler.get_state(target_user=user) - state = format_user_presence_state(state, self.clock.time_msec()) + state = format_user_presence_state( + state, self.clock.time_msec(), include_user_id=False + ) return 200, state @@ -71,7 +71,7 @@ class PresenceStatusRestServlet(RestServlet): if "status_msg" in content: state["status_msg"] = content.pop("status_msg") - if not isinstance(state["status_msg"], string_types): + if not isinstance(state["status_msg"], str): raise SynapseError(400, "status_msg must be a string.") if content: diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 105e0cf4d2..46811abbfa 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -18,8 +18,7 @@ import logging import re from typing import List, Optional - -from six.moves.urllib import parse as urlparse +from urllib import parse as urlparse from canonicaljson import json diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index b58a77826f..182a308eef 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -15,8 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging - -from six.moves import http_client +from http import HTTPStatus from synapse.api.constants import LoginType from synapse.api.errors import Codes, SynapseError, ThreepidValidationError @@ -320,7 +319,7 @@ class DeactivateAccountRestServlet(RestServlet): erase = body.get("erase", False) if not isinstance(erase, bool): raise SynapseError( - http_client.BAD_REQUEST, + HTTPStatus.BAD_REQUEST, "Param 'erase' must be a boolean, if given", Codes.BAD_JSON, ) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index c8d2de7b54..56a451c42f 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -18,8 +18,6 @@ import hmac import logging from typing import List, Union -from six import string_types - import synapse import synapse.api.auth import synapse.types @@ -411,7 +409,7 @@ class RegisterRestServlet(RestServlet): # in sessions. Pull out the username/password provided to us. if "password" in body: password = body.pop("password") - if not isinstance(password, string_types) or len(password) > 512: + if not isinstance(password, str) or len(password) > 512: raise SynapseError(400, "Invalid password") self.password_policy_handler.validate_password(password) @@ -423,10 +421,7 @@ class RegisterRestServlet(RestServlet): desired_username = None if "username" in body: - if ( - not isinstance(body["username"], string_types) - or len(body["username"]) > 512 - ): + if not isinstance(body["username"], str) or len(body["username"]) > 512: raise SynapseError(400, "Invalid username") desired_username = body["username"] @@ -451,7 +446,7 @@ class RegisterRestServlet(RestServlet): access_token = self.auth.get_access_token_from_request(request) - if isinstance(desired_username, string_types): + if isinstance(desired_username, str): result = await self._do_appservice_registration( desired_username, access_token, body ) diff --git a/synapse/rest/client/v2_alpha/report_event.py b/synapse/rest/client/v2_alpha/report_event.py index f067b5edac..e15927c4ea 100644 --- a/synapse/rest/client/v2_alpha/report_event.py +++ b/synapse/rest/client/v2_alpha/report_event.py @@ -14,9 +14,7 @@ # limitations under the License. import logging - -from six import string_types -from six.moves import http_client +from http import HTTPStatus from synapse.api.errors import Codes, SynapseError from synapse.http.servlet import ( @@ -47,15 +45,15 @@ class ReportEventRestServlet(RestServlet): body = parse_json_object_from_request(request) assert_params_in_dict(body, ("reason", "score")) - if not isinstance(body["reason"], string_types): + if not isinstance(body["reason"], str): raise SynapseError( - http_client.BAD_REQUEST, + HTTPStatus.BAD_REQUEST, "Param 'reason' must be a string", Codes.BAD_JSON, ) if not isinstance(body["score"], int): raise SynapseError( - http_client.BAD_REQUEST, + HTTPStatus.BAD_REQUEST, "Param 'score' must be an integer", Codes.BAD_JSON, ) diff --git a/synapse/rest/consent/consent_resource.py b/synapse/rest/consent/consent_resource.py index 4a20282d1b..0a890c98cb 100644 --- a/synapse/rest/consent/consent_resource.py +++ b/synapse/rest/consent/consent_resource.py @@ -16,10 +16,9 @@ import hmac import logging from hashlib import sha256 +from http import HTTPStatus from os import path -from six.moves import http_client - import jinja2 from jinja2 import TemplateNotFound @@ -219,4 +218,4 @@ class ConsentResource(DirectServeResource): ) if not compare_digest(want_mac, userhmac): - raise SynapseError(http_client.FORBIDDEN, "HMAC incorrect") + raise SynapseError(HTTPStatus.FORBIDDEN, "HMAC incorrect") diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py index 3689777266..595849f9d5 100644 --- a/synapse/rest/media/v1/_base.py +++ b/synapse/rest/media/v1/_base.py @@ -16,8 +16,7 @@ import logging import os - -from six.moves import urllib +import urllib from twisted.internet import defer from twisted.protocols.basic import FileSender diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index fd10d42f2f..45628c07b4 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -20,8 +20,6 @@ import os import shutil from typing import Dict, Tuple -from six import iteritems - import twisted.internet.error import twisted.web.http from twisted.web.resource import Resource @@ -340,7 +338,7 @@ class MediaRepository(object): with self.media_storage.store_into_file(file_info) as (f, fname, finish): request_path = "/".join( - ("/_matrix/media/v1/download", server_name, media_id) + ("/_matrix/media/r0/download", server_name, media_id) ) try: length, headers = await self.client.get_file( @@ -606,7 +604,7 @@ class MediaRepository(object): thumbnails[(t_width, t_height, r_type)] = r_method # Now we generate the thumbnails for each dimension, store it - for (t_width, t_height, t_type), t_method in iteritems(thumbnails): + for (t_width, t_height, t_type), t_method in thumbnails.items(): # Generate the thumbnail if t_method == "crop": t_byte_source = await defer_to_thread( @@ -705,7 +703,7 @@ class MediaRepositoryResource(Resource): Uploads are POSTed to a resource which returns a token which is used to GET the download:: - => POST /_matrix/media/v1/upload HTTP/1.1 + => POST /_matrix/media/r0/upload HTTP/1.1 Content-Type: <media-type> Content-Length: <content-length> @@ -716,7 +714,7 @@ class MediaRepositoryResource(Resource): { "content_uri": "mxc://<server-name>/<media-id>" } - => GET /_matrix/media/v1/download/<server-name>/<media-id> HTTP/1.1 + => GET /_matrix/media/r0/download/<server-name>/<media-id> HTTP/1.1 <= HTTP/1.1 200 OK Content-Type: <media-type> @@ -727,7 +725,7 @@ class MediaRepositoryResource(Resource): Clients can get thumbnails by supplying a desired width and height and thumbnailing method:: - => GET /_matrix/media/v1/thumbnail/<server_name> + => GET /_matrix/media/r0/thumbnail/<server_name> /<media-id>?width=<w>&height=<h>&method=<m> HTTP/1.1 <= HTTP/1.1 200 OK diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index 683a79c966..79cb0dddbe 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -17,9 +17,6 @@ import contextlib import logging import os import shutil -import sys - -import six from twisted.internet import defer from twisted.protocols.basic import FileSender @@ -117,12 +114,11 @@ class MediaStorage(object): with open(fname, "wb") as f: yield f, fname, finish except Exception: - t, v, tb = sys.exc_info() try: os.remove(fname) except Exception: pass - six.reraise(t, v, tb) + raise if not finished_called: raise Exception("Finished callback not called") diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index f206605727..b4645cd608 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -24,10 +24,7 @@ import shutil import sys import traceback from typing import Dict, Optional - -import six -from six import string_types -from six.moves import urllib_parse as urlparse +from urllib import parse as urlparse from canonicaljson import json @@ -85,6 +82,15 @@ class PreviewUrlResource(DirectServeResource): self.primary_base_path = media_repo.primary_base_path self.media_storage = media_storage + # We run the background jobs if we're the instance specified (or no + # instance is specified, where we assume there is only one instance + # serving media). + instance_running_jobs = hs.config.media.media_instance_running_background_jobs + self._worker_run_media_background_jobs = ( + instance_running_jobs is None + or instance_running_jobs == hs.get_instance_name() + ) + self.url_preview_url_blacklist = hs.config.url_preview_url_blacklist self.url_preview_accept_language = hs.config.url_preview_accept_language @@ -97,9 +103,10 @@ class PreviewUrlResource(DirectServeResource): expiry_ms=60 * 60 * 1000, ) - self._cleaner_loop = self.clock.looping_call( - self._start_expire_url_cache_data, 10 * 1000 - ) + if self._worker_run_media_background_jobs: + self._cleaner_loop = self.clock.looping_call( + self._start_expire_url_cache_data, 10 * 1000 + ) def render_OPTIONS(self, request): request.setHeader(b"Allow", b"OPTIONS, GET") @@ -188,7 +195,7 @@ class PreviewUrlResource(DirectServeResource): # It may be stored as text in the database, not as bytes (such as # PostgreSQL). If so, encode it back before handing it on. og = cache_result["og"] - if isinstance(og, six.text_type): + if isinstance(og, str): og = og.encode("utf8") return og @@ -400,6 +407,8 @@ class PreviewUrlResource(DirectServeResource): """ # TODO: Delete from backup media store + assert self._worker_run_media_background_jobs + now = self.clock.time_msec() logger.debug("Running url preview cache expiry") @@ -631,7 +640,7 @@ def _iterate_over_text(tree, *tags_to_ignore): if el is None: return - if isinstance(el, string_types): + if isinstance(el, str): yield el elif el.tag not in tags_to_ignore: # el.text is the text before the first child, so we can immediately diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py index 3bf330da49..3bfc8d7278 100644 --- a/synapse/server_notices/consent_server_notices.py +++ b/synapse/server_notices/consent_server_notices.py @@ -14,8 +14,6 @@ # limitations under the License. import logging -from six import iteritems, string_types - from synapse.api.errors import SynapseError from synapse.api.urls import ConsentURIBuilder from synapse.config import ConfigError @@ -118,10 +116,10 @@ def copy_with_str_subst(x, substitutions): Returns: copy of x """ - if isinstance(x, string_types): + if isinstance(x, str): return x % substitutions if isinstance(x, dict): - return {k: copy_with_str_subst(v, substitutions) for (k, v) in iteritems(x)} + return {k: copy_with_str_subst(v, substitutions) for (k, v) in x.items()} if isinstance(x, (list, tuple)): return [copy_with_str_subst(y) for y in x] diff --git a/synapse/server_notices/resource_limits_server_notices.py b/synapse/server_notices/resource_limits_server_notices.py index 73f2cedb5c..4404ceff93 100644 --- a/synapse/server_notices/resource_limits_server_notices.py +++ b/synapse/server_notices/resource_limits_server_notices.py @@ -14,8 +14,6 @@ # limitations under the License. import logging -from six import iteritems - from synapse.api.constants import ( EventTypes, LimitBlockingTypes, @@ -214,7 +212,7 @@ class ResourceLimitsServerNotices(object): referenced_events = list(pinned_state_event.content.get("pinned", [])) events = await self._store.get_events(referenced_events) - for event_id, event in iteritems(events): + for event_id, event in events.items(): if event.type != EventTypes.Message: continue if event.content.get("msgtype") == ServerNoticeMsgType: diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 2fa529fcd0..495d9f04c8 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -18,8 +18,6 @@ import logging from collections import namedtuple from typing import Dict, Iterable, List, Optional, Set -from six import iteritems, itervalues - import attr from frozendict import frozendict from prometheus_client import Histogram @@ -34,6 +32,7 @@ from synapse.logging.utils import log_function from synapse.state import v1, v2 from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour from synapse.types import StateMap +from synapse.util import Clock from synapse.util.async_helpers import Linearizer from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import Measure, measure_func @@ -144,7 +143,7 @@ class StateHandler(object): list(state.values()), get_prev_content=False ) state = { - key: state_map[e_id] for key, e_id in iteritems(state) if e_id in state_map + key: state_map[e_id] for key, e_id in state.items() if e_id in state_map } return state @@ -416,6 +415,7 @@ class StateHandler(object): with Measure(self.clock, "state._resolve_events"): new_state = yield resolve_events_with_store( + self.clock, event.room_id, room_version, state_set_ids, @@ -423,7 +423,7 @@ class StateHandler(object): state_res_store=StateResolutionStore(self.store), ) - new_state = {key: state_map[ev_id] for key, ev_id in iteritems(new_state)} + new_state = {key: state_map[ev_id] for key, ev_id in new_state.items()} return new_state @@ -505,8 +505,8 @@ class StateResolutionHandler(object): # resolve_events_with_store do it? new_state = {} conflicted_state = False - for st in itervalues(state_groups_ids): - for key, e_id in iteritems(st): + for st in state_groups_ids.values(): + for key, e_id in st.items(): if key in new_state: conflicted_state = True break @@ -518,9 +518,10 @@ class StateResolutionHandler(object): logger.info("Resolving conflicted state for %r", room_id) with Measure(self.clock, "state._resolve_events"): new_state = yield resolve_events_with_store( + self.clock, room_id, room_version, - list(itervalues(state_groups_ids)), + list(state_groups_ids.values()), event_map=event_map, state_res_store=state_res_store, ) @@ -561,12 +562,12 @@ def _make_state_cache_entry(new_state, state_groups_ids): # not get persisted. # first look for exact matches - new_state_event_ids = set(itervalues(new_state)) - for sg, state in iteritems(state_groups_ids): + new_state_event_ids = set(new_state.values()) + for sg, state in state_groups_ids.items(): if len(new_state_event_ids) != len(state): continue - old_state_event_ids = set(itervalues(state)) + old_state_event_ids = set(state.values()) if new_state_event_ids == old_state_event_ids: # got an exact match. return _StateCacheEntry(state=new_state, state_group=sg) @@ -579,8 +580,8 @@ def _make_state_cache_entry(new_state, state_groups_ids): prev_group = None delta_ids = None - for old_group, old_state in iteritems(state_groups_ids): - n_delta_ids = {k: v for k, v in iteritems(new_state) if old_state.get(k) != v} + for old_group, old_state in state_groups_ids.items(): + n_delta_ids = {k: v for k, v in new_state.items() if old_state.get(k) != v} if not delta_ids or len(n_delta_ids) < len(delta_ids): prev_group = old_group delta_ids = n_delta_ids @@ -591,6 +592,7 @@ def _make_state_cache_entry(new_state, state_groups_ids): def resolve_events_with_store( + clock: Clock, room_id: str, room_version: str, state_sets: List[StateMap[str]], @@ -627,7 +629,7 @@ def resolve_events_with_store( ) else: return v2.resolve_events_with_store( - room_id, room_version, state_sets, event_map, state_res_store + clock, room_id, room_version, state_sets, event_map, state_res_store ) diff --git a/synapse/state/v1.py b/synapse/state/v1.py index 9bf98d06f2..7b531a8337 100644 --- a/synapse/state/v1.py +++ b/synapse/state/v1.py @@ -17,8 +17,6 @@ import hashlib import logging from typing import Callable, Dict, List, Optional -from six import iteritems, iterkeys, itervalues - from twisted.internet import defer from synapse import event_auth @@ -70,11 +68,11 @@ def resolve_events_with_store( unconflicted_state, conflicted_state = _seperate(state_sets) needed_events = { - event_id for event_ids in itervalues(conflicted_state) for event_id in event_ids + event_id for event_ids in conflicted_state.values() for event_id in event_ids } needed_event_count = len(needed_events) if event_map is not None: - needed_events -= set(iterkeys(event_map)) + needed_events -= set(event_map.keys()) logger.info( "Asking for %d/%d conflicted events", len(needed_events), needed_event_count @@ -102,11 +100,11 @@ def resolve_events_with_store( unconflicted_state, conflicted_state, state_map ) - new_needed_events = set(itervalues(auth_events)) + new_needed_events = set(auth_events.values()) new_needed_event_count = len(new_needed_events) new_needed_events -= needed_events if event_map is not None: - new_needed_events -= set(iterkeys(event_map)) + new_needed_events -= set(event_map.keys()) logger.info( "Asking for %d/%d auth events", len(new_needed_events), new_needed_event_count @@ -152,7 +150,7 @@ def _seperate(state_sets): conflicted_state = {} for state_set in state_set_iterator: - for key, value in iteritems(state_set): + for key, value in state_set.items(): # Check if there is an unconflicted entry for the state key. unconflicted_value = unconflicted_state.get(key) if unconflicted_value is None: @@ -178,7 +176,7 @@ def _seperate(state_sets): def _create_auth_events_from_maps(unconflicted_state, conflicted_state, state_map): auth_events = {} - for event_ids in itervalues(conflicted_state): + for event_ids in conflicted_state.values(): for event_id in event_ids: if event_id in state_map: keys = event_auth.auth_types_for_event(state_map[event_id]) @@ -194,7 +192,7 @@ def _resolve_with_state( unconflicted_state_ids, conflicted_state_ids, auth_event_ids, state_map ): conflicted_state = {} - for key, event_ids in iteritems(conflicted_state_ids): + for key, event_ids in conflicted_state_ids.items(): events = [state_map[ev_id] for ev_id in event_ids if ev_id in state_map] if len(events) > 1: conflicted_state[key] = events @@ -203,7 +201,7 @@ def _resolve_with_state( auth_events = { key: state_map[ev_id] - for key, ev_id in iteritems(auth_event_ids) + for key, ev_id in auth_event_ids.items() if ev_id in state_map } @@ -214,7 +212,7 @@ def _resolve_with_state( raise new_state = unconflicted_state_ids - for key, event in iteritems(resolved_state): + for key, event in resolved_state.items(): new_state[key] = event.event_id return new_state @@ -238,21 +236,21 @@ def _resolve_state_events(conflicted_state, auth_events): auth_events.update(resolved_state) - for key, events in iteritems(conflicted_state): + for key, events in conflicted_state.items(): if key[0] == EventTypes.JoinRules: logger.debug("Resolving conflicted join rules %r", events) resolved_state[key] = _resolve_auth_events(events, auth_events) auth_events.update(resolved_state) - for key, events in iteritems(conflicted_state): + for key, events in conflicted_state.items(): if key[0] == EventTypes.Member: logger.debug("Resolving conflicted member lists %r", events) resolved_state[key] = _resolve_auth_events(events, auth_events) auth_events.update(resolved_state) - for key, events in iteritems(conflicted_state): + for key, events in conflicted_state.items(): if key not in resolved_state: logger.debug("Resolving conflicted state %r:%r", key, events) resolved_state[key] = _resolve_normal_events(events, auth_events) diff --git a/synapse/state/v2.py b/synapse/state/v2.py index 18484e2fa6..bf6caa0946 100644 --- a/synapse/state/v2.py +++ b/synapse/state/v2.py @@ -18,8 +18,6 @@ import itertools import logging from typing import Dict, List, Optional -from six import iteritems, itervalues - from twisted.internet import defer import synapse.state @@ -29,12 +27,20 @@ from synapse.api.errors import AuthError from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import EventBase from synapse.types import StateMap +from synapse.util import Clock logger = logging.getLogger(__name__) +# We want to yield to the reactor occasionally during state res when dealing +# with large data sets, so that we don't exhaust the reactor. This is done by +# yielding to reactor during loops every N iterations. +_YIELD_AFTER_ITERATIONS = 100 + + @defer.inlineCallbacks def resolve_events_with_store( + clock: Clock, room_id: str, room_version: str, state_sets: List[StateMap[str]], @@ -44,13 +50,11 @@ def resolve_events_with_store( """Resolves the state using the v2 state resolution algorithm Args: + clock room_id: the room we are working in - room_version: The room version - state_sets: List of dicts of (type, state_key) -> event_id, which are the different state groups to resolve. - event_map: a dict from event_id to event, for any events that we happen to have in flight (eg, those currently being persisted). This will be @@ -87,7 +91,7 @@ def resolve_events_with_store( full_conflicted_set = set( itertools.chain( - itertools.chain.from_iterable(itervalues(conflicted_state)), auth_diff + itertools.chain.from_iterable(conflicted_state.values()), auth_diff ) ) @@ -115,13 +119,14 @@ def resolve_events_with_store( ) sorted_power_events = yield _reverse_topological_power_sort( - room_id, power_events, event_map, state_res_store, full_conflicted_set + clock, room_id, power_events, event_map, state_res_store, full_conflicted_set ) logger.debug("sorted %d power events", len(sorted_power_events)) # Now sequentially auth each one resolved_state = yield _iterative_auth_checks( + clock, room_id, room_version, sorted_power_events, @@ -135,20 +140,22 @@ def resolve_events_with_store( # OK, so we've now resolved the power events. Now sort the remaining # events using the mainline of the resolved power level. + set_power_events = set(sorted_power_events) leftover_events = [ - ev_id for ev_id in full_conflicted_set if ev_id not in sorted_power_events + ev_id for ev_id in full_conflicted_set if ev_id not in set_power_events ] logger.debug("sorting %d remaining events", len(leftover_events)) pl = resolved_state.get((EventTypes.PowerLevels, ""), None) leftover_events = yield _mainline_sort( - room_id, leftover_events, pl, event_map, state_res_store + clock, room_id, leftover_events, pl, event_map, state_res_store ) logger.debug("resolving remaining events") resolved_state = yield _iterative_auth_checks( + clock, room_id, room_version, leftover_events, @@ -318,12 +325,13 @@ def _add_event_and_auth_chain_to_graph( @defer.inlineCallbacks def _reverse_topological_power_sort( - room_id, event_ids, event_map, state_res_store, auth_diff + clock, room_id, event_ids, event_map, state_res_store, auth_diff ): """Returns a list of the event_ids sorted by reverse topological ordering, and then by power level and origin_server_ts Args: + clock (Clock) room_id (str): the room we are working in event_ids (list[str]): The events to sort event_map (dict[str,FrozenEvent]) @@ -335,18 +343,28 @@ def _reverse_topological_power_sort( """ graph = {} - for event_id in event_ids: + for idx, event_id in enumerate(event_ids, start=1): yield _add_event_and_auth_chain_to_graph( graph, room_id, event_id, event_map, state_res_store, auth_diff ) + # We yield occasionally when we're working with large data sets to + # ensure that we don't block the reactor loop for too long. + if idx % _YIELD_AFTER_ITERATIONS == 0: + yield clock.sleep(0) + event_to_pl = {} - for event_id in graph: + for idx, event_id in enumerate(graph, start=1): pl = yield _get_power_level_for_sender( room_id, event_id, event_map, state_res_store ) event_to_pl[event_id] = pl + # We yield occasionally when we're working with large data sets to + # ensure that we don't block the reactor loop for too long. + if idx % _YIELD_AFTER_ITERATIONS == 0: + yield clock.sleep(0) + def _get_power_order(event_id): ev = event_map[event_id] pl = event_to_pl[event_id] @@ -362,12 +380,13 @@ def _reverse_topological_power_sort( @defer.inlineCallbacks def _iterative_auth_checks( - room_id, room_version, event_ids, base_state, event_map, state_res_store + clock, room_id, room_version, event_ids, base_state, event_map, state_res_store ): """Sequentially apply auth checks to each event in given list, updating the state as it goes along. Args: + clock (Clock) room_id (str) room_version (str) event_ids (list[str]): Ordered list of events to apply auth checks to @@ -381,7 +400,7 @@ def _iterative_auth_checks( resolved_state = base_state.copy() room_version_obj = KNOWN_ROOM_VERSIONS[room_version] - for event_id in event_ids: + for idx, event_id in enumerate(event_ids, start=1): event = event_map[event_id] auth_events = {} @@ -419,17 +438,23 @@ def _iterative_auth_checks( except AuthError: pass + # We yield occasionally when we're working with large data sets to + # ensure that we don't block the reactor loop for too long. + if idx % _YIELD_AFTER_ITERATIONS == 0: + yield clock.sleep(0) + return resolved_state @defer.inlineCallbacks def _mainline_sort( - room_id, event_ids, resolved_power_event_id, event_map, state_res_store + clock, room_id, event_ids, resolved_power_event_id, event_map, state_res_store ): """Returns a sorted list of event_ids sorted by mainline ordering based on the given event resolved_power_event_id Args: + clock (Clock) room_id (str): room we're working in event_ids (list[str]): Events to sort resolved_power_event_id (str): The final resolved power level event ID @@ -439,8 +464,14 @@ def _mainline_sort( Returns: Deferred[list[str]]: The sorted list """ + if not event_ids: + # It's possible for there to be no event IDs here to sort, so we can + # skip calculating the mainline in that case. + return [] + mainline = [] pl = resolved_power_event_id + idx = 0 while pl: mainline.append(pl) pl_ev = yield _get_event(room_id, pl, event_map, state_res_store) @@ -454,17 +485,29 @@ def _mainline_sort( pl = aid break + # We yield occasionally when we're working with large data sets to + # ensure that we don't block the reactor loop for too long. + if idx != 0 and idx % _YIELD_AFTER_ITERATIONS == 0: + yield clock.sleep(0) + + idx += 1 + mainline_map = {ev_id: i + 1 for i, ev_id in enumerate(reversed(mainline))} event_ids = list(event_ids) order_map = {} - for ev_id in event_ids: + for idx, ev_id in enumerate(event_ids, start=1): depth = yield _get_mainline_depth_for_event( event_map[ev_id], mainline_map, event_map, state_res_store ) order_map[ev_id] = (depth, event_map[ev_id].origin_server_ts, ev_id) + # We yield occasionally when we're working with large data sets to + # ensure that we don't block the reactor loop for too long. + if idx % _YIELD_AFTER_ITERATIONS == 0: + yield clock.sleep(0) + event_ids.sort(key=lambda ev_id: order_map[ev_id]) return event_ids @@ -572,7 +615,7 @@ def lexicographical_topological_sort(graph, key): # `(key(node), node)` so that sorting does the right thing zero_outdegree = [] - for node, edges in iteritems(graph): + for node, edges in graph.items(): if len(edges) == 0: zero_outdegree.append((key(node), node)) diff --git a/synapse/static/client/login/index.html b/synapse/static/client/login/index.html index 6fefdaaff7..9e6daf38ac 100644 --- a/synapse/static/client/login/index.html +++ b/synapse/static/client/login/index.html @@ -1,24 +1,24 @@ <!doctype html> <html> <head> -<title> Login </title> -<meta name='viewport' content='width=device-width, initial-scale=1, user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'> -<link rel="stylesheet" href="style.css"> -<script src="js/jquery-3.4.1.min.js"></script> -<script src="js/login.js"></script> + <meta http-equiv="Content-Type" content="text/html; charset=utf-8"> + <title> Login </title> + <meta name='viewport' content='width=device-width, initial-scale=1, user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'> + <link rel="stylesheet" href="style.css"> + <script src="js/jquery-3.4.1.min.js"></script> + <script src="js/login.js"></script> </head> <body onload="matrixLogin.onLoad()"> - <center> - <br/> + <div id="container"> <h1 id="title"></h1> - <span id="feedback" style="color: #f00"></span> + <span id="feedback"></span> <div id="loading"> <img src="spinner.gif" /> </div> - <div id="sso_flow" class="login_flow" style="display:none"> + <div id="sso_flow" class="login_flow" style="display: none;"> Single-sign on: <form id="sso_form" action="/_matrix/client/r0/login/sso/redirect" method="get"> <input id="sso_redirect_url" type="hidden" name="redirectUrl" value=""/> @@ -26,9 +26,9 @@ </form> </div> - <div id="password_flow" class="login_flow" style="display:none"> + <div id="password_flow" class="login_flow" style="display: none;"> Password Authentication: - <form onsubmit="matrixLogin.password_login(); return false;"> + <form onsubmit="matrixLogin.passwordLogin(); return false;"> <input id="user_id" size="32" type="text" placeholder="Matrix ID (e.g. bob)" autocapitalize="off" autocorrect="off" /> <br/> <input id="password" size="32" type="password" placeholder="Password"/> @@ -38,9 +38,9 @@ </form> </div> - <div id="no_login_types" type="button" class="login_flow" style="display:none"> + <div id="no_login_types" type="button" class="login_flow" style="display: none;"> Log in currently unavailable. </div> - </center> + </div> </body> </html> diff --git a/synapse/static/client/login/js/login.js b/synapse/static/client/login/js/login.js index ba8048b23f..3678670ec7 100644 --- a/synapse/static/client/login/js/login.js +++ b/synapse/static/client/login/js/login.js @@ -5,11 +5,11 @@ window.matrixLogin = { }; // Titles get updated through the process to give users feedback. -var TITLE_PRE_AUTH = "Log in with one of the following methods"; -var TITLE_POST_AUTH = "Logging in..."; +const TITLE_PRE_AUTH = "Log in with one of the following methods"; +const TITLE_POST_AUTH = "Logging in..."; // The cookie used to store the original query parameters when using SSO. -var COOKIE_KEY = "synapse_login_fallback_qs"; +const COOKIE_KEY = "synapse_login_fallback_qs"; /* * Submit a login request. @@ -20,9 +20,9 @@ var COOKIE_KEY = "synapse_login_fallback_qs"; * login request, e.g. device_id. * callback: (Optional) Function to call on successful login. */ -var submitLogin = function(type, data, extra, callback) { +function submitLogin(type, data, extra, callback) { console.log("Logging in with " + type); - set_title(TITLE_POST_AUTH); + setTitle(TITLE_POST_AUTH); // Add the login type. data.type = type; @@ -41,12 +41,15 @@ var submitLogin = function(type, data, extra, callback) { } matrixLogin.onLogin(response); }).fail(errorFunc); -}; +} -var errorFunc = function(err) { +/* + * Display an error to the user and show the login form again. + */ +function errorFunc(err) { // We want to show the error to the user rather than redirecting immediately to the // SSO portal (if SSO is the only login option), so we inhibit the redirect. - show_login(true); + showLogin(true); if (err.responseJSON && err.responseJSON.error) { setFeedbackString(err.responseJSON.error + " (" + err.responseJSON.errcode + ")"); @@ -54,27 +57,42 @@ var errorFunc = function(err) { else { setFeedbackString("Request failed: " + err.status); } -}; +} -var setFeedbackString = function(text) { +/* + * Display an error to the user. + */ +function setFeedbackString(text) { $("#feedback").text(text); -}; +} -var show_login = function(inhibit_redirect) { - // Set the redirect to come back to this page, a login token will get added - // and handled after the redirect. - var this_page = window.location.origin + window.location.pathname; - $("#sso_redirect_url").val(this_page); +/* + * (Maybe) Show the login forms. + * + * This actually does a few unrelated functions: + * + * * Configures the SSO redirect URL to come back to this page. + * * Configures and shows the SSO form, if the server supports SSO. + * * Otherwise, shows the password form. + */ +function showLogin(inhibitRedirect) { + setTitle(TITLE_PRE_AUTH); - // If inhibit_redirect is false, and SSO is the only supported login method, + // If inhibitRedirect is false, and SSO is the only supported login method, // we can redirect straight to the SSO page. if (matrixLogin.serverAcceptsSso) { + // Set the redirect to come back to this page, a login token will get + // added as a query parameter and handled after the redirect. + $("#sso_redirect_url").val(window.location.origin + window.location.pathname); + // Before submitting SSO, set the current query parameters into a cookie // for retrieval later. var qs = parseQsFromUrl(); setCookie(COOKIE_KEY, JSON.stringify(qs)); - if (!inhibit_redirect && !matrixLogin.serverAcceptsPassword) { + // If password is not supported and redirects are allowed, then submit + // the form (redirecting to the SSO provider). + if (!inhibitRedirect && !matrixLogin.serverAcceptsPassword) { $("#sso_form").submit(); return; } @@ -87,30 +105,39 @@ var show_login = function(inhibit_redirect) { $("#password_flow").show(); } + // If neither password or SSO are supported, show an error to the user. if (!matrixLogin.serverAcceptsPassword && !matrixLogin.serverAcceptsSso) { $("#no_login_types").show(); } - set_title(TITLE_PRE_AUTH); - $("#loading").hide(); -}; +} -var show_spinner = function() { +/* + * Hides the forms and shows a loading throbber. + */ +function showSpinner() { $("#password_flow").hide(); $("#sso_flow").hide(); $("#no_login_types").hide(); $("#loading").show(); -}; +} -var set_title = function(title) { +/* + * Helper to show the page's main title. + */ +function setTitle(title) { $("#title").text(title); -}; +} -var fetch_info = function(cb) { +/* + * Query the login endpoint for the homeserver's supported flows. + * + * This populates matrixLogin.serverAccepts* variables. + */ +function fetchLoginFlows(cb) { $.get(matrixLogin.endpoint, function(response) { - var serverAcceptsPassword = false; - for (var i=0; i<response.flows.length; i++) { + for (var i = 0; i < response.flows.length; i++) { var flow = response.flows[i]; if ("m.login.sso" === flow.type) { matrixLogin.serverAcceptsSso = true; @@ -126,27 +153,41 @@ var fetch_info = function(cb) { }).fail(errorFunc); } +/* + * Called on load to fetch login flows and attempt SSO login (if a token is available). + */ matrixLogin.onLoad = function() { - fetch_info(function() { - if (!try_token()) { - show_login(false); + fetchLoginFlows(function() { + // (Maybe) attempt logging in via SSO if a token is available. + if (!tryTokenLogin()) { + showLogin(false); } }); }; -matrixLogin.password_login = function() { +/* + * Submit simple user & password login. + */ +matrixLogin.passwordLogin = function() { var user = $("#user_id").val(); var pwd = $("#password").val(); setFeedbackString(""); - show_spinner(); + showSpinner(); submitLogin( "m.login.password", {user: user, password: pwd}, parseQsFromUrl()); }; +/* + * The onLogin function gets called after a succesful login. + * + * It is expected that implementations override this to be notified when the + * login is complete. The response to the login call is provided as the single + * parameter. + */ matrixLogin.onLogin = function(response) { // clobber this function console.warn("onLogin - This function should be replaced to proceed."); @@ -155,7 +196,7 @@ matrixLogin.onLogin = function(response) { /* * Process the query parameters from the current URL into an object. */ -var parseQsFromUrl = function() { +function parseQsFromUrl() { var pos = window.location.href.indexOf("?"); if (pos == -1) { return {}; @@ -174,12 +215,12 @@ var parseQsFromUrl = function() { result[key] = val; }); return result; -}; +} /* * Process the cookies and return an object. */ -var parseCookies = function() { +function parseCookies() { var allCookies = document.cookie; var result = {}; allCookies.split(";").forEach(function(part) { @@ -196,32 +237,32 @@ var parseCookies = function() { result[key] = val; }); return result; -}; +} /* * Set a cookie that is valid for 1 hour. */ -var setCookie = function(key, value) { +function setCookie(key, value) { // The maximum age is set in seconds. var maxAge = 60 * 60; // Set the cookie, this defaults to the current domain and path. document.cookie = key + "=" + encodeURIComponent(value) + ";max-age=" + maxAge + ";sameSite=lax"; -}; +} /* * Removes a cookie by key. */ -var deleteCookie = function(key) { +function deleteCookie(key) { // Delete a cookie by setting the expiration to 0. (Note that the value // doesn't matter.) document.cookie = key + "=deleted;expires=0"; -}; +} /* * Submits the login token if one is found in the query parameters. Returns a * boolean of whether the login token was found or not. */ -var try_token = function() { +function tryTokenLogin() { // Check if the login token is in the query parameters. var qs = parseQsFromUrl(); @@ -233,18 +274,18 @@ var try_token = function() { // Retrieve the original query parameters (from before the SSO redirect). // They are stored as JSON in a cookie. var cookies = parseCookies(); - var original_query_params = JSON.parse(cookies[COOKIE_KEY] || "{}") + var originalQueryParams = JSON.parse(cookies[COOKIE_KEY] || "{}") // If the login is successful, delete the cookie. - var callback = function() { + function callback() { deleteCookie(COOKIE_KEY); } submitLogin( "m.login.token", {token: loginToken}, - original_query_params, + originalQueryParams, callback); return true; -}; +} diff --git a/synapse/static/client/login/style.css b/synapse/static/client/login/style.css index 1cce5ed950..83e4f6abc8 100644 --- a/synapse/static/client/login/style.css +++ b/synapse/static/client/login/style.css @@ -31,20 +31,44 @@ form { margin: 10px 0 0 0; } +/* + * Add some padding to the viewport. + */ +#container { + padding: 10px; +} +/* + * Center all direct children of the main form. + */ +#container > * { + display: block; + margin-left: auto; + margin-right: auto; + text-align: center; +} + +/* + * A wrapper around each login flow. + */ .login_flow { width: 300px; text-align: left; padding: 10px; margin-bottom: 40px; - -webkit-border-radius: 10px; - -moz-border-radius: 10px; border-radius: 10px; - - -webkit-box-shadow: 0px 0px 20px 0px rgba(0,0,0,0.15); - -moz-box-shadow: 0px 0px 20px 0px rgba(0,0,0,0.15); box-shadow: 0px 0px 20px 0px rgba(0,0,0,0.15); background-color: #f8f8f8; border: 1px #ccc solid; } + +/* + * Used to show error content. + */ +#feedback { + /* Red text. */ + color: #ff0000; + /* A little space to not overlap the box-shadow. */ + margin-bottom: 20px; +} diff --git a/synapse/storage/data_stores/main/cache.py b/synapse/storage/data_stores/main/cache.py index eac5a4e55b..d30766e543 100644 --- a/synapse/storage/data_stores/main/cache.py +++ b/synapse/storage/data_stores/main/cache.py @@ -19,7 +19,9 @@ import logging from typing import Any, Iterable, Optional, Tuple from synapse.api.constants import EventTypes +from synapse.replication.tcp.streams import BackfillStream, CachesStream from synapse.replication.tcp.streams.events import ( + EventsStream, EventsStreamCurrentStateRow, EventsStreamEventRow, ) @@ -71,10 +73,10 @@ class CacheInvalidationWorkerStore(SQLBaseStore): ) def process_replication_rows(self, stream_name, instance_name, token, rows): - if stream_name == "events": + if stream_name == EventsStream.NAME: for row in rows: self._process_event_stream_row(token, row) - elif stream_name == "backfill": + elif stream_name == BackfillStream.NAME: for row in rows: self._invalidate_caches_for_event( -token, @@ -86,7 +88,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): row.relates_to, backfilled=True, ) - elif stream_name == "caches": + elif stream_name == CachesStream.NAME: if self._cache_id_gen: self._cache_id_gen.advance(instance_name, token) diff --git a/synapse/storage/data_stores/main/client_ips.py b/synapse/storage/data_stores/main/client_ips.py index 71f8d43a76..995d4764a9 100644 --- a/synapse/storage/data_stores/main/client_ips.py +++ b/synapse/storage/data_stores/main/client_ips.py @@ -15,8 +15,6 @@ import logging -from six import iteritems - from twisted.internet import defer from synapse.metrics.background_process_metrics import wrap_as_background_process @@ -421,7 +419,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): ): self.database_engine.lock_table(txn, "user_ips") - for entry in iteritems(to_update): + for entry in to_update.items(): (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry try: @@ -530,7 +528,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): "user_agent": user_agent, "last_seen": last_seen, } - for (access_token, ip), (user_agent, last_seen) in iteritems(results) + for (access_token, ip), (user_agent, last_seen) in results.items() ] @wrap_as_background_process("prune_old_user_ips") diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index fb9f798e29..0ff0542453 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -17,8 +17,6 @@ import logging from typing import List, Optional, Set, Tuple -from six import iteritems - from canonicaljson import json from twisted.internet import defer @@ -208,7 +206,7 @@ class DeviceWorkerStore(SQLBaseStore): ) # add the updated cross-signing keys to the results list - for user_id, result in iteritems(cross_signing_keys_by_user): + for user_id, result in cross_signing_keys_by_user.items(): result["user_id"] = user_id # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec results.append(("org.matrix.signing_key_update", result)) @@ -269,7 +267,7 @@ class DeviceWorkerStore(SQLBaseStore): ) results = [] - for user_id, user_devices in iteritems(devices): + for user_id, user_devices in devices.items(): # The prev_id for the first row is always the last row before # `from_stream_id` prev_id = yield self._get_last_device_update_for_remote_user( @@ -493,7 +491,7 @@ class DeviceWorkerStore(SQLBaseStore): if devices: user_devices = devices[user_id] results = [] - for device_id, device in iteritems(user_devices): + for device_id, device in user_devices.items(): result = {"device_id": device_id} key_json = device.get("key_json", None) diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py index 20698bfd16..1a0842d4b0 100644 --- a/synapse/storage/data_stores/main/end_to_end_keys.py +++ b/synapse/storage/data_stores/main/end_to_end_keys.py @@ -16,8 +16,6 @@ # limitations under the License. from typing import Dict, List -from six import iteritems - from canonicaljson import encode_canonical_json, json from twisted.enterprise.adbapi import Connection @@ -64,9 +62,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore): # Build the result structure, un-jsonify the results, and add the # "unsigned" section rv = {} - for user_id, device_keys in iteritems(results): + for user_id, device_keys in results.items(): rv[user_id] = {} - for device_id, device_info in iteritems(device_keys): + for device_id, device_info in device_keys.items(): r = db_to_json(device_info.pop("key_json")) r["unsigned"] = {} display_name = device_info["device_display_name"] diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py index 24ce8c4330..a6bb3221ff 100644 --- a/synapse/storage/data_stores/main/event_federation.py +++ b/synapse/storage/data_stores/main/event_federation.py @@ -14,10 +14,9 @@ # limitations under the License. import itertools import logging +from queue import Empty, PriorityQueue from typing import Dict, List, Optional, Set, Tuple -from six.moves.queue import Empty, PriorityQueue - from twisted.internet import defer from synapse.api.errors import StoreError diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py index 0321274de2..bc9f4f08ea 100644 --- a/synapse/storage/data_stores/main/event_push_actions.py +++ b/synapse/storage/data_stores/main/event_push_actions.py @@ -16,8 +16,6 @@ import logging -from six import iteritems - from canonicaljson import json from twisted.internet import defer @@ -455,7 +453,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): sql, ( _gen_entry(user_id, actions) - for user_id, actions in iteritems(user_id_actions) + for user_id, actions in user_id_actions.items() ), ) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index a6572571b4..cfd24d2f06 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -21,9 +21,6 @@ from collections import OrderedDict, namedtuple from functools import wraps from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple -from six import integer_types, iteritems, text_type -from six.moves import range - import attr from canonicaljson import json from prometheus_client import Counter @@ -232,10 +229,10 @@ class PersistEventsStore: event_counter.labels(event.type, origin_type, origin_entity).inc() - for room_id, new_state in iteritems(current_state_for_room): + for room_id, new_state in current_state_for_room.items(): self.store.get_current_state_ids.prefill((room_id,), new_state) - for room_id, latest_event_ids in iteritems(new_forward_extremeties): + for room_id, latest_event_ids in new_forward_extremeties.items(): self.store.get_latest_event_ids_in_room.prefill( (room_id,), list(latest_event_ids) ) @@ -461,7 +458,7 @@ class PersistEventsStore: state_delta_by_room: Dict[str, DeltaState], stream_id: int, ): - for room_id, delta_state in iteritems(state_delta_by_room): + for room_id, delta_state in state_delta_by_room.items(): to_delete = delta_state.to_delete to_insert = delta_state.to_insert @@ -545,7 +542,7 @@ class PersistEventsStore: """, [ (room_id, key[0], key[1], ev_id, ev_id) - for key, ev_id in iteritems(to_insert) + for key, ev_id in to_insert.items() ], ) @@ -642,7 +639,7 @@ class PersistEventsStore: def _update_forward_extremities_txn( self, txn, new_forward_extremities, max_stream_order ): - for room_id, new_extrem in iteritems(new_forward_extremities): + for room_id, new_extrem in new_forward_extremities.items(): self.db.simple_delete_txn( txn, table="event_forward_extremities", keyvalues={"room_id": room_id} ) @@ -655,7 +652,7 @@ class PersistEventsStore: table="event_forward_extremities", values=[ {"event_id": ev_id, "room_id": room_id} - for room_id, new_extrem in iteritems(new_forward_extremities) + for room_id, new_extrem in new_forward_extremities.items() for ev_id in new_extrem ], ) @@ -672,7 +669,7 @@ class PersistEventsStore: "event_id": event_id, "stream_ordering": max_stream_order, } - for room_id, new_extrem in iteritems(new_forward_extremities) + for room_id, new_extrem in new_forward_extremities.items() for event_id in new_extrem ], ) @@ -727,7 +724,7 @@ class PersistEventsStore: event.depth, depth_updates.get(event.room_id, event.depth) ) - for room_id, depth in iteritems(depth_updates): + for room_id, depth in depth_updates.items(): self._update_min_depth_for_room_txn(txn, room_id, depth) def _update_outliers_txn(self, txn, events_and_contexts): @@ -893,8 +890,7 @@ class PersistEventsStore: "received_ts": self._clock.time_msec(), "sender": event.sender, "contains_url": ( - "url" in event.content - and isinstance(event.content["url"], text_type) + "url" in event.content and isinstance(event.content["url"], str) ), } for event, _ in events_and_contexts @@ -1345,10 +1341,10 @@ class PersistEventsStore: ): if ( "min_lifetime" in event.content - and not isinstance(event.content.get("min_lifetime"), integer_types) + and not isinstance(event.content.get("min_lifetime"), int) ) or ( "max_lifetime" in event.content - and not isinstance(event.content.get("max_lifetime"), integer_types) + and not isinstance(event.content.get("max_lifetime"), int) ): # Ignore the event if one of the value isn't an integer. return @@ -1497,11 +1493,11 @@ class PersistEventsStore: table="event_to_state_groups", values=[ {"state_group": state_group_id, "event_id": event_id} - for event_id, state_group_id in iteritems(state_groups) + for event_id, state_group_id in state_groups.items() ], ) - for event_id, state_group_id in iteritems(state_groups): + for event_id, state_group_id in state_groups.items(): txn.call_after( self.store._get_state_group_for_event.prefill, (event_id,), diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py index f54c8b1ee0..62d28f44dc 100644 --- a/synapse/storage/data_stores/main/events_bg_updates.py +++ b/synapse/storage/data_stores/main/events_bg_updates.py @@ -15,8 +15,6 @@ import logging -from six import text_type - from canonicaljson import json from twisted.internet import defer @@ -133,7 +131,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): contains_url = "url" in content if contains_url: - contains_url &= isinstance(content["url"], text_type) + contains_url &= isinstance(content["url"], str) except (KeyError, AttributeError): # If the event is missing a necessary field then # skip over it. diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 213d69100a..47a3e63589 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -38,6 +38,8 @@ from synapse.events.utils import prune_event from synapse.logging.context import PreserveLoggingContext, current_context from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker +from synapse.replication.tcp.streams import BackfillStream +from synapse.replication.tcp.streams.events import EventsStream from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.storage.database import Database from synapse.storage.util.id_generators import StreamIdGenerator @@ -113,9 +115,9 @@ class EventsWorkerStore(SQLBaseStore): self._event_fetch_ongoing = 0 def process_replication_rows(self, stream_name, instance_name, token, rows): - if stream_name == "events": + if stream_name == EventsStream.NAME: self._stream_id_gen.advance(token) - elif stream_name == "backfill": + elif stream_name == BackfillStream.NAME: self._backfill_id_gen.advance(-token) super().process_replication_rows(stream_name, instance_name, token, rows) @@ -1077,9 +1079,32 @@ class EventsWorkerStore(SQLBaseStore): "get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn ) - def get_all_new_backfill_event_rows(self, last_id, current_id, limit): + async def get_all_new_backfill_event_rows( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, list]], int, bool]: + """Get updates for backfill replication stream, including all new + backfilled events and events that have gone from being outliers to not. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ if last_id == current_id: - return defer.succeed([]) + return [], current_id, False def get_all_new_backfill_event_rows(txn): sql = ( @@ -1094,10 +1119,12 @@ class EventsWorkerStore(SQLBaseStore): " LIMIT ?" ) txn.execute(sql, (-last_id, -current_id, limit)) - new_event_updates = txn.fetchall() + new_event_updates = [(row[0], row[1:]) for row in txn] + limited = False if len(new_event_updates) == limit: upper_bound = new_event_updates[-1][0] + limited = True else: upper_bound = current_id @@ -1114,11 +1141,15 @@ class EventsWorkerStore(SQLBaseStore): " ORDER BY event_stream_ordering DESC" ) txn.execute(sql, (-last_id, -upper_bound)) - new_event_updates.extend(txn.fetchall()) + new_event_updates.extend((row[0], row[1:]) for row in txn) - return new_event_updates + if len(new_event_updates) >= limit: + upper_bound = new_event_updates[-1][0] + limited = True - return self.db.runInteraction( + return new_event_updates, upper_bound, limited + + return await self.db.runInteraction( "get_all_new_backfill_event_rows", get_all_new_backfill_event_rows ) diff --git a/synapse/storage/data_stores/main/media_repository.py b/synapse/storage/data_stores/main/media_repository.py index 8aecd414c2..15bc13cbd0 100644 --- a/synapse/storage/data_stores/main/media_repository.py +++ b/synapse/storage/data_stores/main/media_repository.py @@ -81,6 +81,15 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): desc="store_local_media", ) + def mark_local_media_as_safe(self, media_id: str): + """Mark a local media as safe from quarantining.""" + return self.db.simple_update_one( + table="local_media_repository", + keyvalues={"media_id": media_id}, + updatevalues={"safe_from_quarantine": True}, + desc="mark_local_media_as_safe", + ) + def get_url_cache(self, url, ts): """Get the media_id and ts for a cached URL as of the given timestamp Returns: diff --git a/synapse/storage/data_stores/main/presence.py b/synapse/storage/data_stores/main/presence.py index dab31e0c2d..7574612619 100644 --- a/synapse/storage/data_stores/main/presence.py +++ b/synapse/storage/data_stores/main/presence.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import List, Tuple + from twisted.internet import defer from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause @@ -73,9 +75,32 @@ class PresenceStore(SQLBaseStore): ) txn.execute(sql + clause, [stream_id] + list(args)) - def get_all_presence_updates(self, last_id, current_id, limit): + async def get_all_presence_updates( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, list]], int, bool]: + """Get updates for presence replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ + if last_id == current_id: - return defer.succeed([]) + return [], current_id, False def get_all_presence_updates_txn(txn): sql = """ @@ -89,9 +114,17 @@ class PresenceStore(SQLBaseStore): LIMIT ? """ txn.execute(sql, (last_id, current_id, limit)) - return txn.fetchall() + updates = [(row[0], row[1:]) for row in txn] + + upper_bound = current_id + limited = False + if len(updates) >= limit: + upper_bound = updates[-1][0] + limited = True + + return updates, upper_bound, limited - return self.db.runInteraction( + return await self.db.runInteraction( "get_all_presence_updates", get_all_presence_updates_txn ) diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py index ef8f40959f..f6e78ca590 100644 --- a/synapse/storage/data_stores/main/push_rule.py +++ b/synapse/storage/data_stores/main/push_rule.py @@ -16,7 +16,7 @@ import abc import logging -from typing import Union +from typing import List, Tuple, Union from canonicaljson import json @@ -348,23 +348,53 @@ class PushRulesWorkerStore( results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled return results - def get_all_push_rule_updates(self, last_id, current_id, limit): - """Get all the push rules changes that have happend on the server""" + async def get_all_push_rule_updates( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + """Get updates for push_rules replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ + if last_id == current_id: - return defer.succeed([]) + return [], current_id, False def get_all_push_rule_updates_txn(txn): - sql = ( - "SELECT stream_id, event_stream_ordering, user_id, rule_id," - " op, priority_class, priority, conditions, actions" - " FROM push_rules_stream" - " WHERE ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC LIMIT ?" - ) + sql = """ + SELECT stream_id, user_id + FROM push_rules_stream + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + LIMIT ? + """ txn.execute(sql, (last_id, current_id, limit)) - return txn.fetchall() + updates = [(stream_id, (user_id,)) for stream_id, user_id in txn] + + limited = False + upper_bound = current_id + if len(updates) == limit: + limited = True + upper_bound = updates[-1][0] + + return updates, upper_bound, limited - return self.db.runInteraction( + return await self.db.runInteraction( "get_all_push_rule_updates", get_all_push_rule_updates_txn ) diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py index cebdcd409f..8f5505bd67 100644 --- a/synapse/storage/data_stores/main/receipts.py +++ b/synapse/storage/data_stores/main/receipts.py @@ -16,6 +16,7 @@ import abc import logging +from typing import List, Tuple from canonicaljson import json @@ -24,6 +25,7 @@ from twisted.internet import defer from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.storage.database import Database from synapse.storage.util.id_generators import StreamIdGenerator +from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -266,26 +268,79 @@ class ReceiptsWorkerStore(SQLBaseStore): } return results - def get_all_updated_receipts(self, last_id, current_id, limit=None): + def get_users_sent_receipts_between(self, last_id: int, current_id: int): + """Get all users who sent receipts between `last_id` exclusive and + `current_id` inclusive. + + Returns: + Deferred[List[str]] + """ + if last_id == current_id: return defer.succeed([]) - def get_all_updated_receipts_txn(txn): - sql = ( - "SELECT stream_id, room_id, receipt_type, user_id, event_id, data" - " FROM receipts_linearized" - " WHERE ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC" - ) - args = [last_id, current_id] - if limit is not None: - sql += " LIMIT ?" - args.append(limit) - txn.execute(sql, args) + def _get_users_sent_receipts_between_txn(txn): + sql = """ + SELECT DISTINCT user_id FROM receipts_linearized + WHERE ? < stream_id AND stream_id <= ? + """ + txn.execute(sql, (last_id, current_id)) - return [r[0:5] + (json.loads(r[5]),) for r in txn] + return [r[0] for r in txn] return self.db.runInteraction( + "get_users_sent_receipts_between", _get_users_sent_receipts_between_txn + ) + + async def get_all_updated_receipts( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, list]], int, bool]: + """Get updates for receipts replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ + + if last_id == current_id: + return [], current_id, False + + def get_all_updated_receipts_txn(txn): + sql = """ + SELECT stream_id, room_id, receipt_type, user_id, event_id, data + FROM receipts_linearized + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + LIMIT ? + """ + txn.execute(sql, (last_id, current_id, limit)) + + updates = [(r[0], r[1:5] + (json.loads(r[5]),)) for r in txn] + + limited = False + upper_bound = current_id + + if len(updates) == limit: + limited = True + upper_bound = updates[-1][0] + + return updates, upper_bound, limited + + return await self.db.runInteraction( "get_all_updated_receipts", get_all_updated_receipts_txn ) @@ -300,10 +355,10 @@ class ReceiptsWorkerStore(SQLBaseStore): room_id, None, update_metrics=False ) - # first handle the Deferred case - if isinstance(res, defer.Deferred): - if res.called: - res = res.result + # first handle the ObservableDeferred case + if isinstance(res, ObservableDeferred): + if res.has_called(): + res = res.get_result() else: res = None diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py index 9768981891..587d4b91c1 100644 --- a/synapse/storage/data_stores/main/registration.py +++ b/synapse/storage/data_stores/main/registration.py @@ -19,8 +19,6 @@ import logging import re from typing import Optional -from six import iterkeys - from twisted.internet import defer from twisted.internet.defer import Deferred @@ -753,7 +751,7 @@ class RegistrationWorkerStore(SQLBaseStore): last_send_attempt, validated_at FROM threepid_validation_session WHERE %s """ % ( - " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)), + " AND ".join("%s = ?" % k for k in keyvalues.keys()), ) if validated is not None: diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py index 46f643c6b9..13e366536a 100644 --- a/synapse/storage/data_stores/main/room.py +++ b/synapse/storage/data_stores/main/room.py @@ -626,36 +626,10 @@ class RoomWorkerStore(SQLBaseStore): def _quarantine_media_in_room_txn(txn): local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id) - total_media_quarantined = 0 - - # Now update all the tables to set the quarantined_by flag - - txn.executemany( - """ - UPDATE local_media_repository - SET quarantined_by = ? - WHERE media_id = ? - """, - ((quarantined_by, media_id) for media_id in local_mxcs), - ) - - txn.executemany( - """ - UPDATE remote_media_cache - SET quarantined_by = ? - WHERE media_origin = ? AND media_id = ? - """, - ( - (quarantined_by, origin, media_id) - for origin, media_id in remote_mxcs - ), + return self._quarantine_media_txn( + txn, local_mxcs, remote_mxcs, quarantined_by ) - total_media_quarantined += len(local_mxcs) - total_media_quarantined += len(remote_mxcs) - - return total_media_quarantined - return self.db.runInteraction( "quarantine_media_in_room", _quarantine_media_in_room_txn ) @@ -805,17 +779,17 @@ class RoomWorkerStore(SQLBaseStore): Returns: The total number of media items quarantined """ - total_media_quarantined = 0 - # Update all the tables to set the quarantined_by flag txn.executemany( """ UPDATE local_media_repository SET quarantined_by = ? - WHERE media_id = ? + WHERE media_id = ? AND safe_from_quarantine = ? """, - ((quarantined_by, media_id) for media_id in local_mxcs), + ((quarantined_by, media_id, False) for media_id in local_mxcs), ) + # Note that a rowcount of -1 can be used to indicate no rows were affected. + total_media_quarantined = txn.rowcount if txn.rowcount > 0 else 0 txn.executemany( """ @@ -825,9 +799,7 @@ class RoomWorkerStore(SQLBaseStore): """, ((quarantined_by, origin, media_id) for origin, media_id in remote_mxcs), ) - - total_media_quarantined += len(local_mxcs) - total_media_quarantined += len(remote_mxcs) + total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0 return total_media_quarantined diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py index 137ebac833..44bab65eac 100644 --- a/synapse/storage/data_stores/main/roommember.py +++ b/synapse/storage/data_stores/main/roommember.py @@ -17,8 +17,6 @@ import logging from typing import Iterable, List, Set -from six import iteritems, itervalues - from canonicaljson import json from twisted.internet import defer @@ -544,7 +542,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): users_in_room = {} member_event_ids = [ e_id - for key, e_id in iteritems(current_state_ids) + for key, e_id in current_state_ids.items() if key[0] == EventTypes.Member ] @@ -561,7 +559,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): users_in_room = dict(prev_res) member_event_ids = [ e_id - for key, e_id in iteritems(context.delta_ids) + for key, e_id in context.delta_ids.items() if key[0] == EventTypes.Member ] for etype, state_key in context.delta_ids: @@ -1101,7 +1099,7 @@ class _JoinedHostsCache(object): if state_entry.state_group == self.state_group: pass elif state_entry.prev_group == self.state_group: - for (typ, state_key), event_id in iteritems(state_entry.delta_ids): + for (typ, state_key), event_id in state_entry.delta_ids.items(): if typ != EventTypes.Member: continue @@ -1131,7 +1129,7 @@ class _JoinedHostsCache(object): self.state_group = state_entry.state_group else: self.state_group = object() - self._len = sum(len(v) for v in itervalues(self.hosts_to_joined_users)) + self._len = sum(len(v) for v in self.hosts_to_joined_users.values()) return frozenset(self.hosts_to_joined_users) def __len__(self): diff --git a/synapse/storage/data_stores/main/schema/delta/30/as_users.py b/synapse/storage/data_stores/main/schema/delta/30/as_users.py index 9b95411fb6..b42c02710a 100644 --- a/synapse/storage/data_stores/main/schema/delta/30/as_users.py +++ b/synapse/storage/data_stores/main/schema/delta/30/as_users.py @@ -13,8 +13,6 @@ # limitations under the License. import logging -from six.moves import range - from synapse.config.appservice import load_appservices logger = logging.getLogger(__name__) diff --git a/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.postgres b/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.postgres new file mode 100644 index 0000000000..597f2ffd3d --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.postgres @@ -0,0 +1,18 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- The local_media_repository should have files which do not get quarantined, +-- e.g. files from sticker packs. +ALTER TABLE local_media_repository ADD COLUMN safe_from_quarantine BOOLEAN NOT NULL DEFAULT FALSE; diff --git a/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.sqlite b/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.sqlite new file mode 100644 index 0000000000..69db89ac0e --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.sqlite @@ -0,0 +1,18 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- The local_media_repository should have files which do not get quarantined, +-- e.g. files from sticker packs. +ALTER TABLE local_media_repository ADD COLUMN safe_from_quarantine BOOLEAN NOT NULL DEFAULT 0; diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py index 13f49d8060..a8381dc577 100644 --- a/synapse/storage/data_stores/main/search.py +++ b/synapse/storage/data_stores/main/search.py @@ -17,8 +17,6 @@ import logging import re from collections import namedtuple -from six import string_types - from canonicaljson import json from twisted.internet import defer @@ -180,7 +178,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): # skip over it. continue - if not isinstance(value, string_types): + if not isinstance(value, str): # If the event body, name or topic isn't a string # then skip over it continue diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index e89f0bffb5..379d758b5d 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -40,8 +40,6 @@ import abc import logging from collections import namedtuple -from six.moves import range - from twisted.internet import defer from synapse.logging.context import make_deferred_yieldable, run_in_background diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py index 4219018302..f8c776be3f 100644 --- a/synapse/storage/data_stores/main/tags.py +++ b/synapse/storage/data_stores/main/tags.py @@ -16,8 +16,6 @@ import logging -from six.moves import range - from canonicaljson import json from twisted.internet import defer diff --git a/synapse/storage/data_stores/main/ui_auth.py b/synapse/storage/data_stores/main/ui_auth.py index 1d8ee22fb1..ec2f38c373 100644 --- a/synapse/storage/data_stores/main/ui_auth.py +++ b/synapse/storage/data_stores/main/ui_auth.py @@ -186,7 +186,7 @@ class UIAuthWorkerStore(SQLBaseStore): # The clientdict gets stored as JSON. clientdict_json = json.dumps(clientdict) - self.db.simple_update_one( + await self.db.simple_update_one( table="ui_auth_sessions", keyvalues={"session_id": session_id}, updatevalues={"clientdict": clientdict_json}, diff --git a/synapse/storage/data_stores/state/bg_updates.py b/synapse/storage/data_stores/state/bg_updates.py index ff000bc9ec..be1fe97d79 100644 --- a/synapse/storage/data_stores/state/bg_updates.py +++ b/synapse/storage/data_stores/state/bg_updates.py @@ -15,8 +15,6 @@ import logging -from six import iteritems - from twisted.internet import defer from synapse.storage._base import SQLBaseStore @@ -280,7 +278,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): delta_state = { key: value - for key, value in iteritems(curr_state) + for key, value in curr_state.items() if prev_state.get(key, None) != value } @@ -316,7 +314,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in iteritems(delta_state) + for key, state_id in delta_state.items() ], ) diff --git a/synapse/storage/data_stores/state/store.py b/synapse/storage/data_stores/state/store.py index f3ad1e4369..5db9f20135 100644 --- a/synapse/storage/data_stores/state/store.py +++ b/synapse/storage/data_stores/state/store.py @@ -17,9 +17,6 @@ import logging from collections import namedtuple from typing import Dict, Iterable, List, Set, Tuple -from six import iteritems -from six.moves import range - from twisted.internet import defer from synapse.api.constants import EventTypes @@ -263,7 +260,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): # And finally update the result dict, by filtering out any extra # stuff we pulled out of the database. - for group, group_state_dict in iteritems(group_to_state_dict): + for group, group_state_dict in group_to_state_dict.items(): # We just replace any existing entries, as we will have loaded # everything we need from the database anyway. state[group] = state_filter.filter_state(group_state_dict) @@ -341,11 +338,11 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): else: non_member_types = non_member_filter.concrete_types() - for group, group_state_dict in iteritems(group_to_state_dict): + for group, group_state_dict in group_to_state_dict.items(): state_dict_members = {} state_dict_non_members = {} - for k, v in iteritems(group_state_dict): + for k, v in group_state_dict.items(): if k[0] == EventTypes.Member: state_dict_members[k] = v else: @@ -432,7 +429,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in iteritems(delta_ids) + for key, state_id in delta_ids.items() ], ) else: @@ -447,7 +444,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in iteritems(current_state_ids) + for key, state_id in current_state_ids.items() ], ) @@ -458,7 +455,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): current_member_state_ids = { s: ev - for (s, ev) in iteritems(current_state_ids) + for (s, ev) in current_state_ids.items() if s[0] == EventTypes.Member } txn.call_after( @@ -470,7 +467,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): current_non_member_state_ids = { s: ev - for (s, ev) in iteritems(current_state_ids) + for (s, ev) in current_state_ids.items() if s[0] != EventTypes.Member } txn.call_after( @@ -555,7 +552,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in iteritems(curr_state) + for key, state_id in curr_state.items() ], ) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index b112ff3df2..3be20c866a 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -16,6 +16,7 @@ # limitations under the License. import logging import time +from sys import intern from time import monotonic as monotonic_time from typing import ( Any, @@ -29,9 +30,6 @@ from typing import ( TypeVar, ) -from six import iteritems, iterkeys, itervalues -from six.moves import intern, range - from prometheus_client import Histogram from twisted.enterprise import adbapi @@ -259,7 +257,7 @@ class PerformanceCounters(object): def interval(self, interval_duration_secs, limit=3): counters = [] - for name, (count, cum_time) in iteritems(self.current_counters): + for name, (count, cum_time) in self.current_counters.items(): prev_count, prev_time = self.previous_counters.get(name, (0, 0)) counters.append( ( @@ -1053,7 +1051,7 @@ class Database(object): sql = ("SELECT %(retcol)s FROM %(table)s") % {"retcol": retcol, "table": table} if keyvalues: - sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)) + sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys()) txn.execute(sql, list(keyvalues.values())) else: txn.execute(sql) @@ -1191,7 +1189,7 @@ class Database(object): clause, values = make_in_list_sql_clause(txn.database_engine, column, iterable) clauses = [clause] - for key, value in iteritems(keyvalues): + for key, value in keyvalues.items(): clauses.append("%s = ?" % (key,)) values.append(value) @@ -1212,7 +1210,7 @@ class Database(object): @staticmethod def simple_update_txn(txn, table, keyvalues, updatevalues): if keyvalues: - where = "WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)) + where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys()) else: where = "" @@ -1351,7 +1349,7 @@ class Database(object): clause, values = make_in_list_sql_clause(txn.database_engine, column, iterable) clauses = [clause] - for key, value in iteritems(keyvalues): + for key, value in keyvalues.items(): clauses.append("%s = ?" % (key,)) values.append(value) @@ -1388,7 +1386,7 @@ class Database(object): txn.close() if cache: - min_val = min(itervalues(cache)) + min_val = min(cache.values()) else: min_val = max_value diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 6c7d08a6f2..a31588080d 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -92,7 +92,7 @@ class PostgresEngine(BaseDatabaseEngine): errors.append(" - 'COLLATE' is set to %r. Should be 'C'" % (collation,)) if ctype != "C": - errors.append(" - 'CTYPE' is set to %r. Should be 'C'" % (collation,)) + errors.append(" - 'CTYPE' is set to %r. Should be 'C'" % (ctype,)) if errors: raise IncorrectDatabaseSetup( diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index f159400a87..ec894a91cb 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -20,9 +20,6 @@ import logging from collections import deque, namedtuple from typing import Iterable, List, Optional, Set, Tuple -from six import iteritems -from six.moves import range - from prometheus_client import Counter, Histogram from twisted.internet import defer @@ -218,7 +215,7 @@ class EventsPersistenceStorage(object): partitioned.setdefault(event.room_id, []).append((event, ctx)) deferreds = [] - for room_id, evs_ctxs in iteritems(partitioned): + for room_id, evs_ctxs in partitioned.items(): d = self._event_persist_queue.add_to_queue( room_id, evs_ctxs, backfilled=backfilled ) @@ -319,7 +316,7 @@ class EventsPersistenceStorage(object): (event, context) ) - for room_id, ev_ctx_rm in iteritems(events_by_room): + for room_id, ev_ctx_rm in events_by_room.items(): latest_event_ids = await self.main_store.get_latest_event_ids_in_room( room_id ) @@ -674,7 +671,7 @@ class EventsPersistenceStorage(object): to_insert = { key: ev_id - for key, ev_id in iteritems(current_state) + for key, ev_id in current_state.items() if ev_id != existing_state.get(key) } diff --git a/synapse/storage/state.py b/synapse/storage/state.py index c522c80922..dc568476f4 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -16,8 +16,6 @@ import logging from typing import Iterable, List, TypeVar -from six import iteritems, itervalues - import attr from twisted.internet import defer @@ -51,7 +49,7 @@ class StateFilter(object): # If `include_others` is set we canonicalise the filter by removing # wildcards from the types dictionary if self.include_others: - self.types = {k: v for k, v in iteritems(self.types) if v is not None} + self.types = {k: v for k, v in self.types.items() if v is not None} @staticmethod def all(): @@ -150,7 +148,7 @@ class StateFilter(object): has_non_member_wildcard = self.include_others or any( state_keys is None - for t, state_keys in iteritems(self.types) + for t, state_keys in self.types.items() if t != EventTypes.Member ) @@ -199,7 +197,7 @@ class StateFilter(object): # First we build up a lost of clauses for each type/state_key combo clauses = [] - for etype, state_keys in iteritems(self.types): + for etype, state_keys in self.types.items(): if state_keys is None: clauses.append("(type = ?)") where_args.append(etype) @@ -251,7 +249,7 @@ class StateFilter(object): return dict(state_dict) filtered_state = {} - for k, v in iteritems(state_dict): + for k, v in state_dict.items(): typ, state_key = k if typ in self.types: state_keys = self.types[typ] @@ -279,7 +277,7 @@ class StateFilter(object): """ return self.include_others or any( - state_keys is None for state_keys in itervalues(self.types) + state_keys is None for state_keys in self.types.values() ) def concrete_types(self): @@ -292,7 +290,7 @@ class StateFilter(object): """ return [ (t, s) - for t, state_keys in iteritems(self.types) + for t, state_keys in self.types.items() if state_keys is not None for s in state_keys ] @@ -324,7 +322,7 @@ class StateFilter(object): member_filter = StateFilter.none() non_member_filter = StateFilter( - types={k: v for k, v in iteritems(self.types) if k != EventTypes.Member}, + types={k: v for k, v in self.types.items() if k != EventTypes.Member}, include_others=self.include_others, ) @@ -366,7 +364,7 @@ class StateGroupStorage(object): event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids) - groups = set(itervalues(event_to_groups)) + groups = set(event_to_groups.values()) group_to_state = yield self.stores.state._get_state_for_groups(groups) return group_to_state @@ -400,8 +398,8 @@ class StateGroupStorage(object): state_event_map = yield self.stores.main.get_events( [ ev_id - for group_ids in itervalues(group_to_ids) - for ev_id in itervalues(group_ids) + for group_ids in group_to_ids.values() + for ev_id in group_ids.values() ], get_prev_content=False, ) @@ -409,10 +407,10 @@ class StateGroupStorage(object): return { group: [ state_event_map[v] - for v in itervalues(event_id_map) + for v in event_id_map.values() if v in state_event_map ] - for group, event_id_map in iteritems(group_to_ids) + for group, event_id_map in group_to_ids.items() } def _get_state_groups_from_groups( @@ -444,23 +442,23 @@ class StateGroupStorage(object): """ event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids) - groups = set(itervalues(event_to_groups)) + groups = set(event_to_groups.values()) group_to_state = yield self.stores.state._get_state_for_groups( groups, state_filter ) state_event_map = yield self.stores.main.get_events( - [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)], + [ev_id for sd in group_to_state.values() for ev_id in sd.values()], get_prev_content=False, ) event_to_state = { event_id: { k: state_event_map[v] - for k, v in iteritems(group_to_state[group]) + for k, v in group_to_state[group].items() if v in state_event_map } - for event_id, group in iteritems(event_to_groups) + for event_id, group in event_to_groups.items() } return {event: event_to_state[event] for event in event_ids} @@ -481,14 +479,14 @@ class StateGroupStorage(object): """ event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids) - groups = set(itervalues(event_to_groups)) + groups = set(event_to_groups.values()) group_to_state = yield self.stores.state._get_state_for_groups( groups, state_filter ) event_to_state = { event_id: group_to_state[group] - for event_id, group in iteritems(event_to_groups) + for event_id, group in event_to_groups.items() } return {event: event_to_state[event] for event in event_ids} diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index f7af2bca7f..65abf0846e 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -19,8 +19,6 @@ import logging from contextlib import contextmanager from typing import Dict, Sequence, Set, Union -from six.moves import range - import attr from twisted.internet import defer @@ -95,7 +93,7 @@ class ObservableDeferred(object): This returns a brand new deferred that is resolved when the underlying deferred is resolved. Interacting with the returned deferred does not - effect the underdlying deferred. + effect the underlying deferred. """ if not self._result: d = defer.Deferred() diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index cd48262420..64f35fc288 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -21,8 +21,6 @@ import threading from typing import Any, Tuple, Union, cast from weakref import WeakValueDictionary -from six import itervalues - from prometheus_client import Gauge from typing_extensions import Protocol @@ -281,7 +279,7 @@ class Cache(object): def invalidate_all(self): self.check_thread() self.cache.clear() - for entry in itervalues(self._pending_deferred_cache): + for entry in self._pending_deferred_cache.values(): entry.invalidate() self._pending_deferred_cache.clear() diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 2726b67b6d..89a3420f92 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -16,8 +16,6 @@ import logging from collections import OrderedDict -from six import iteritems, itervalues - from synapse.config import cache as cache_config from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches import register_cache @@ -150,7 +148,7 @@ class ExpiringCache(object): keys_to_delete = set() - for key, cache_entry in iteritems(self._cache): + for key, cache_entry in self._cache.items(): if now - cache_entry.time > self._expiry_ms: keys_to_delete.add(key) @@ -170,7 +168,7 @@ class ExpiringCache(object): def __len__(self): if self.iterable: - return sum(len(entry.value) for entry in itervalues(self._cache)) + return sum(len(entry.value) for entry in self._cache.values()) else: return len(self._cache) diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 2a161bf244..c541bf4579 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -17,8 +17,6 @@ import logging import math from typing import Dict, FrozenSet, List, Mapping, Optional, Set, Union -from six import integer_types - from sortedcontainers import SortedDict from synapse.types import Collection @@ -88,7 +86,7 @@ class StreamChangeCache: def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool: """Returns True if the entity may have been updated since stream_pos """ - assert type(stream_pos) in integer_types + assert isinstance(stream_pos, int) if stream_pos < self._earliest_known_stream_pos: self.metrics.inc_misses() diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py index 2ea4e4e911..ecd9948e79 100644 --- a/synapse/util/caches/treecache.py +++ b/synapse/util/caches/treecache.py @@ -1,7 +1,5 @@ from typing import Dict -from six import itervalues - SENTINEL = object() @@ -81,7 +79,7 @@ def iterate_tree_cache_entry(d): can contain dicts. """ if isinstance(d, dict): - for value_d in itervalues(d): + for value_d in d.values(): for value in iterate_tree_cache_entry(value_d): yield value else: diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 8b17d1c8b8..6a3f6177b1 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from six.moves import queue +import queue from twisted.internet import threads diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py index 9815bb8667..eab78dd256 100644 --- a/synapse/util/frozenutils.py +++ b/synapse/util/frozenutils.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from six import binary_type, text_type - from canonicaljson import json from frozendict import frozendict @@ -26,7 +24,7 @@ def freeze(o): if isinstance(o, frozendict): return o - if isinstance(o, (binary_type, text_type)): + if isinstance(o, (bytes, str)): return o try: @@ -41,7 +39,7 @@ def unfreeze(o): if isinstance(o, (dict, frozendict)): return dict({k: unfreeze(v) for k, v in o.items()}) - if isinstance(o, (binary_type, text_type)): + if isinstance(o, (bytes, str)): return o try: diff --git a/synapse/util/wheel_timer.py b/synapse/util/wheel_timer.py index 9bf6a44f75..023beb5ede 100644 --- a/synapse/util/wheel_timer.py +++ b/synapse/util/wheel_timer.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from six.moves import range - class _Entry(object): __slots__ = ["end_key", "queue"] diff --git a/synapse/visibility.py b/synapse/visibility.py index bab41182b9..3dfd4af26c 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -16,9 +16,6 @@ import logging import operator -from six import iteritems, itervalues -from six.moves import map - from twisted.internet import defer from synapse.api.constants import EventTypes, Membership @@ -298,7 +295,7 @@ def filter_events_for_server( # membership states for the requesting server to determine # if the server is either in the room or has been invited # into the room. - for ev in itervalues(state): + for ev in state.values(): if ev.type != EventTypes.Member: continue try: @@ -332,7 +329,7 @@ def filter_events_for_server( ) visibility_ids = set() - for sids in itervalues(event_to_state_ids): + for sids in event_to_state_ids.values(): hist = sids.get((EventTypes.RoomHistoryVisibility, "")) if hist: visibility_ids.add(hist) @@ -345,7 +342,7 @@ def filter_events_for_server( event_map = yield storage.main.get_events(visibility_ids) all_open = all( e.content.get("history_visibility") in (None, "shared", "world_readable") - for e in itervalues(event_map) + for e in event_map.values() ) if not check_history_visibility_only: @@ -394,8 +391,8 @@ def filter_events_for_server( # event_id_to_state_key = { event_id: key - for key_to_eid in itervalues(event_to_state_ids) - for key, event_id in iteritems(key_to_eid) + for key_to_eid in event_to_state_ids.values() + for key, event_id in key_to_eid.items() } def include(typ, state_key): @@ -409,20 +406,16 @@ def filter_events_for_server( return state_key[idx + 1 :] == server_name event_map = yield storage.main.get_events( - [ - e_id - for e_id, key in iteritems(event_id_to_state_key) - if include(key[0], key[1]) - ] + [e_id for e_id, key in event_id_to_state_key.items() if include(key[0], key[1])] ) event_to_state = { e_id: { key: event_map[inner_e_id] - for key, inner_e_id in iteritems(key_to_eid) + for key, inner_e_id in key_to_eid.items() if inner_e_id in event_map } - for e_id, key_to_eid in iteritems(event_to_state_ids) + for e_id, key_to_eid in event_to_state_ids.items() } to_return = [] |