diff --git a/synapse/__init__.py b/synapse/__init__.py
index c371e8f3c4..f5cd8271a6 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -36,7 +36,7 @@ try:
except ImportError:
pass
-__version__ = "1.15.2"
+__version__ = "1.16.0rc1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/_scripts/register_new_matrix_user.py b/synapse/_scripts/register_new_matrix_user.py
index d528450c78..55cce2db22 100644
--- a/synapse/_scripts/register_new_matrix_user.py
+++ b/synapse/_scripts/register_new_matrix_user.py
@@ -23,8 +23,6 @@ import hmac
import logging
import sys
-from six.moves import input
-
import requests as _requests
import yaml
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 06ade25674..06ba6604f3 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -16,8 +16,6 @@
import logging
from typing import Optional
-from six import itervalues
-
import pymacaroons
from netaddr import IPAddress
@@ -90,7 +88,7 @@ class Auth(object):
event, prev_state_ids, for_verification=True
)
auth_events = yield self.store.get_events(auth_events_ids)
- auth_events = {(e.type, e.state_key): e for e in itervalues(auth_events)}
+ auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
event_auth.check(
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 5ec4a77ccd..6a6d32c302 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -150,3 +150,8 @@ class EventContentFields(object):
# Timestamp to delete the event after
# cf https://github.com/matrix-org/matrix-doc/pull/2228
SELF_DESTRUCT_AFTER = "org.matrix.self_destruct_after"
+
+
+class RoomEncryptionAlgorithms(object):
+ MEGOLM_V1_AES_SHA2 = "m.megolm.v1.aes-sha2"
+ DEFAULT = MEGOLM_V1_AES_SHA2
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index d54dfb385d..5305038c21 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -17,11 +17,9 @@
"""Contains exceptions and error codes."""
import logging
+from http import HTTPStatus
from typing import Dict, List
-from six import iteritems
-from six.moves import http_client
-
from canonicaljson import json
from twisted.web import http
@@ -174,7 +172,7 @@ class ConsentNotGivenError(SynapseError):
consent_url (str): The URL where the user can give their consent
"""
super(ConsentNotGivenError, self).__init__(
- code=http_client.FORBIDDEN, msg=msg, errcode=Codes.CONSENT_NOT_GIVEN
+ code=HTTPStatus.FORBIDDEN, msg=msg, errcode=Codes.CONSENT_NOT_GIVEN
)
self._consent_uri = consent_uri
@@ -194,7 +192,7 @@ class UserDeactivatedError(SynapseError):
msg (str): The human-readable error message
"""
super(UserDeactivatedError, self).__init__(
- code=http_client.FORBIDDEN, msg=msg, errcode=Codes.USER_DEACTIVATED
+ code=HTTPStatus.FORBIDDEN, msg=msg, errcode=Codes.USER_DEACTIVATED
)
@@ -497,7 +495,7 @@ def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
A dict representing the error response JSON.
"""
err = {"error": msg, "errcode": code}
- for key, value in iteritems(kwargs):
+ for key, value in kwargs.items():
err[key] = value
return err
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 8b64d0a285..f988f62a1e 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -17,8 +17,6 @@
# limitations under the License.
from typing import List
-from six import text_type
-
import jsonschema
from canonicaljson import json
from jsonschema import FormatChecker
@@ -313,7 +311,7 @@ class Filter(object):
content = event.get("content", {})
# check if there is a string url field in the content for filtering purposes
- contains_url = isinstance(content.get("url"), text_type)
+ contains_url = isinstance(content.get("url"), str)
labels = content.get(EventContentFields.LABELS, [])
return self.check_fields(room_id, sender, ev_type, labels, contains_url)
diff --git a/synapse/api/urls.py b/synapse/api/urls.py
index f34434bd67..bd03ebca5a 100644
--- a/synapse/api/urls.py
+++ b/synapse/api/urls.py
@@ -17,8 +17,7 @@
"""Contains the URL paths to prefix various aspects of the server with. """
import hmac
from hashlib import sha256
-
-from six.moves.urllib.parse import urlencode
+from urllib.parse import urlencode
from synapse.config import ConfigError
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index dedff81af3..373a80a4a7 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -20,6 +20,7 @@ import signal
import socket
import sys
import traceback
+from typing import Iterable
from daemonize import Daemonize
from typing_extensions import NoReturn
@@ -29,6 +30,7 @@ from twisted.protocols.tls import TLSMemoryBIOFactory
import synapse
from synapse.app import check_bind_error
+from synapse.config.server import ListenerConfig
from synapse.crypto import context_factory
from synapse.logging.context import PreserveLoggingContext
from synapse.util.async_helpers import Linearizer
@@ -234,7 +236,7 @@ def refresh_certificate(hs):
logger.info("Context factories updated.")
-def start(hs, listeners=None):
+def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
"""
Start a Synapse server or worker.
@@ -245,8 +247,8 @@ def start(hs, listeners=None):
notify systemd.
Args:
- hs (synapse.server.HomeServer)
- listeners (list[dict]): Listener configuration ('listeners' in homeserver.yaml)
+ hs: homeserver instance
+ listeners: Listener configuration ('listeners' in homeserver.yaml)
"""
try:
# Set up the SIGHUP machinery.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index f3ec2a34ec..27a3fc9ed6 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -37,6 +37,7 @@ from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
+from synapse.config.server import ListenerConfig
from synapse.federation import send_queue
from synapse.federation.transport.server import TransportLayerServer
from synapse.handlers.presence import (
@@ -514,13 +515,18 @@ class GenericWorkerSlavedStore(
class GenericWorkerServer(HomeServer):
DATASTORE_CLASS = GenericWorkerSlavedStore
- def _listen_http(self, listener_config):
- port = listener_config["port"]
- bind_addresses = listener_config["bind_addresses"]
- site_tag = listener_config.get("tag", port)
+ def _listen_http(self, listener_config: ListenerConfig):
+ port = listener_config.port
+ bind_addresses = listener_config.bind_addresses
+
+ assert listener_config.http_options is not None
+
+ site_tag = listener_config.http_options.tag
+ if site_tag is None:
+ site_tag = port
resources = {}
- for res in listener_config["resources"]:
- for name in res["names"]:
+ for res in listener_config.http_options.resources:
+ for name in res.names:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
@@ -590,7 +596,7 @@ class GenericWorkerServer(HomeServer):
" repository is disabled. Ignoring."
)
- if name == "openid" and "federation" not in res["names"]:
+ if name == "openid" and "federation" not in res.names:
# Only load the openid resource separately if federation resource
# is not specified since federation resource includes openid
# resource.
@@ -625,19 +631,19 @@ class GenericWorkerServer(HomeServer):
logger.info("Synapse worker now listening on port %d", port)
- def start_listening(self, listeners):
+ def start_listening(self, listeners: Iterable[ListenerConfig]):
for listener in listeners:
- if listener["type"] == "http":
+ if listener.type == "http":
self._listen_http(listener)
- elif listener["type"] == "manhole":
+ elif listener.type == "manhole":
_base.listen_tcp(
- listener["bind_addresses"],
- listener["port"],
+ listener.bind_addresses,
+ listener.port,
manhole(
username="matrix", password="rabbithole", globals={"hs": self}
),
)
- elif listener["type"] == "metrics":
+ elif listener.type == "metrics":
if not self.get_config().enable_metrics:
logger.warning(
(
@@ -646,9 +652,9 @@ class GenericWorkerServer(HomeServer):
)
)
else:
- _base.listen_metrics(listener["bind_addresses"], listener["port"])
+ _base.listen_metrics(listener.bind_addresses, listener.port)
else:
- logger.warning("Unrecognized listener type: %s", listener["type"])
+ logger.warning("Unsupported listener type: %s", listener.type)
self.get_tcp_replication().start_replication(self)
@@ -738,6 +744,11 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
except Exception:
logger.exception("Error processing replication")
+ async def on_position(self, stream_name: str, instance_name: str, token: int):
+ await super().on_position(stream_name, instance_name, token)
+ # Also call on_rdata to ensure that stream positions are properly reset.
+ await self.on_rdata(stream_name, instance_name, token, [])
+
def stop_pusher(self, user_id, app_id, pushkey):
if not self.notify_pushers:
return
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 41994dc14b..09291d86ad 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -23,8 +23,7 @@ import math
import os
import resource
import sys
-
-from six import iteritems
+from typing import Iterable
from prometheus_client import Gauge
@@ -50,6 +49,7 @@ from synapse.app import _base
from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
+from synapse.config.server import ListenerConfig
from synapse.federation.transport.server import TransportLayerServer
from synapse.http.additional_resource import AdditionalResource
from synapse.http.server import (
@@ -90,24 +90,24 @@ def gz_wrap(r):
class SynapseHomeServer(HomeServer):
DATASTORE_CLASS = DataStore
- def _listener_http(self, config, listener_config):
- port = listener_config["port"]
- bind_addresses = listener_config["bind_addresses"]
- tls = listener_config.get("tls", False)
- site_tag = listener_config.get("tag", port)
+ def _listener_http(self, config: HomeServerConfig, listener_config: ListenerConfig):
+ port = listener_config.port
+ bind_addresses = listener_config.bind_addresses
+ tls = listener_config.tls
+ site_tag = listener_config.http_options.tag
+ if site_tag is None:
+ site_tag = port
resources = {}
- for res in listener_config["resources"]:
- for name in res["names"]:
- if name == "openid" and "federation" in res["names"]:
+ for res in listener_config.http_options.resources:
+ for name in res.names:
+ if name == "openid" and "federation" in res.names:
# Skip loading openid resource if federation is defined
# since federation resource will include openid
continue
- resources.update(
- self._configure_named_resource(name, res.get("compress", False))
- )
+ resources.update(self._configure_named_resource(name, res.compress))
- additional_resources = listener_config.get("additional_resources", {})
+ additional_resources = listener_config.http_options.additional_resources
logger.debug("Configuring additional resources: %r", additional_resources)
module_api = ModuleApi(self, self.get_auth_handler())
for path, resmodule in additional_resources.items():
@@ -279,7 +279,7 @@ class SynapseHomeServer(HomeServer):
return resources
- def start_listening(self, listeners):
+ def start_listening(self, listeners: Iterable[ListenerConfig]):
config = self.get_config()
if config.redis_enabled:
@@ -289,25 +289,25 @@ class SynapseHomeServer(HomeServer):
self.get_tcp_replication().start_replication(self)
for listener in listeners:
- if listener["type"] == "http":
+ if listener.type == "http":
self._listening_services.extend(self._listener_http(config, listener))
- elif listener["type"] == "manhole":
+ elif listener.type == "manhole":
listen_tcp(
- listener["bind_addresses"],
- listener["port"],
+ listener.bind_addresses,
+ listener.port,
manhole(
username="matrix", password="rabbithole", globals={"hs": self}
),
)
- elif listener["type"] == "replication":
+ elif listener.type == "replication":
services = listen_tcp(
- listener["bind_addresses"],
- listener["port"],
+ listener.bind_addresses,
+ listener.port,
ReplicationStreamProtocolFactory(self),
)
for s in services:
reactor.addSystemEventTrigger("before", "shutdown", s.stopListening)
- elif listener["type"] == "metrics":
+ elif listener.type == "metrics":
if not self.get_config().enable_metrics:
logger.warning(
(
@@ -316,9 +316,11 @@ class SynapseHomeServer(HomeServer):
)
)
else:
- _base.listen_metrics(listener["bind_addresses"], listener["port"])
+ _base.listen_metrics(listener.bind_addresses, listener.port)
else:
- logger.warning("Unrecognized listener type: %s", listener["type"])
+ # this shouldn't happen, as the listener type should have been checked
+ # during parsing
+ logger.warning("Unrecognized listener type: %s", listener.type)
# Gauges to expose monthly active user control metrics
@@ -526,7 +528,7 @@ def phone_stats_home(hs, stats, stats_process=_stats_process):
stats["total_nonbridged_users"] = total_nonbridged_users
daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
- for name, count in iteritems(daily_user_type_results):
+ for name, count in daily_user_type_results.items():
stats["daily_user_type_" + name] = count
room_count = yield hs.get_datastore().get_room_count()
@@ -538,7 +540,7 @@ def phone_stats_home(hs, stats, stats_process=_stats_process):
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
r30_results = yield hs.get_datastore().count_r30_users()
- for name, count in iteritems(r30_results):
+ for name, count in r30_results.items():
stats["r30_users_" + name] = count
daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 1b13e84425..0323256472 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -15,8 +15,6 @@
import logging
import re
-from six import string_types
-
from twisted.internet import defer
from synapse.api.constants import EventTypes
@@ -156,7 +154,7 @@ class ApplicationService(object):
)
regex = regex_obj.get("regex")
- if isinstance(regex, string_types):
+ if isinstance(regex, str):
regex_obj["regex"] = re.compile(regex) # Pre-compile regex
else:
raise ValueError("Expected string for 'regex' in ns '%s'" % ns)
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 57174da021..da9a5e86d4 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -13,8 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-
-from six.moves import urllib
+import urllib
from prometheus_client import Counter
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 30d1050a91..1391e5fc43 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -22,8 +22,6 @@ from collections import OrderedDict
from textwrap import dedent
from typing import Any, MutableMapping, Optional
-from six import integer_types
-
import yaml
@@ -117,7 +115,7 @@ class Config(object):
@staticmethod
def parse_size(value):
- if isinstance(value, integer_types):
+ if isinstance(value, int):
return value
sizes = {"K": 1024, "M": 1024 * 1024}
size = 1
@@ -129,7 +127,7 @@ class Config(object):
@staticmethod
def parse_duration(value):
- if isinstance(value, integer_types):
+ if isinstance(value, int):
return value
second = 1000
minute = 60 * second
diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py
index ca43e96bd1..8ed3e24258 100644
--- a/synapse/config/appservice.py
+++ b/synapse/config/appservice.py
@@ -14,9 +14,7 @@
import logging
from typing import Dict
-
-from six import string_types
-from six.moves.urllib import parse as urlparse
+from urllib import parse as urlparse
import yaml
from netaddr import IPSet
@@ -98,17 +96,14 @@ def load_appservices(hostname, config_files):
def _load_appservice(hostname, as_info, config_filename):
required_string_fields = ["id", "as_token", "hs_token", "sender_localpart"]
for field in required_string_fields:
- if not isinstance(as_info.get(field), string_types):
+ if not isinstance(as_info.get(field), str):
raise KeyError(
"Required string field: '%s' (%s)" % (field, config_filename)
)
# 'url' must either be a string or explicitly null, not missing
# to avoid accidentally turning off push for ASes.
- if (
- not isinstance(as_info.get("url"), string_types)
- and as_info.get("url", "") is not None
- ):
+ if not isinstance(as_info.get("url"), str) and as_info.get("url", "") is not None:
raise KeyError(
"Required string field or explicit null: 'url' (%s)" % (config_filename,)
)
@@ -138,7 +133,7 @@ def _load_appservice(hostname, as_info, config_filename):
ns,
regex_obj,
)
- if not isinstance(regex_obj.get("regex"), string_types):
+ if not isinstance(regex_obj.get("regex"), str):
raise ValueError("Missing/bad type 'regex' key in %s", regex_obj)
if not isinstance(regex_obj.get("exclusive"), bool):
raise ValueError(
diff --git a/synapse/config/cache.py b/synapse/config/cache.py
index 0672538796..aff5b21ab2 100644
--- a/synapse/config/cache.py
+++ b/synapse/config/cache.py
@@ -15,6 +15,7 @@
import os
import re
+import threading
from typing import Callable, Dict
from ._base import Config, ConfigError
@@ -25,6 +26,9 @@ _CACHE_PREFIX = "SYNAPSE_CACHE_FACTOR"
# Map from canonicalised cache name to cache.
_CACHES = {}
+# a lock on the contents of _CACHES
+_CACHES_LOCK = threading.Lock()
+
_DEFAULT_FACTOR_SIZE = 0.5
_DEFAULT_EVENT_CACHE_SIZE = "10K"
@@ -66,7 +70,10 @@ def add_resizable_cache(cache_name: str, cache_resize_callback: Callable):
# Some caches have '*' in them which we strip out.
cache_name = _canonicalise_cache_name(cache_name)
- _CACHES[cache_name] = cache_resize_callback
+ # sometimes caches are initialised from background threads, so we need to make
+ # sure we don't conflict with another thread running a resize operation
+ with _CACHES_LOCK:
+ _CACHES[cache_name] = cache_resize_callback
# Ensure all loaded caches are sized appropriately
#
@@ -87,7 +94,8 @@ class CacheConfig(Config):
os.environ.get(_CACHE_PREFIX, _DEFAULT_FACTOR_SIZE)
)
properties.resize_all_caches_func = None
- _CACHES.clear()
+ with _CACHES_LOCK:
+ _CACHES.clear()
def generate_config_section(self, **kwargs):
return """\
@@ -193,6 +201,8 @@ class CacheConfig(Config):
For each cache, run the mapped callback function with either
a specific cache factor or the default, global one.
"""
- for cache_name, callback in _CACHES.items():
- new_factor = self.cache_factors.get(cache_name, self.global_factor)
- callback(new_factor)
+ # block other threads from modifying _CACHES while we iterate it.
+ with _CACHES_LOCK:
+ for cache_name, callback in _CACHES.items():
+ new_factor = self.cache_factors.get(cache_name, self.global_factor)
+ callback(new_factor)
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 2c7b3a699f..264c274c52 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -36,6 +36,7 @@ from .ratelimiting import RatelimitConfig
from .redis import RedisConfig
from .registration import RegistrationConfig
from .repository import ContentRepositoryConfig
+from .room import RoomConfig
from .room_directory import RoomDirectoryConfig
from .saml2_config import SAML2Config
from .server import ServerConfig
@@ -79,6 +80,7 @@ class HomeServerConfig(RootConfig):
PasswordAuthProviderConfig,
PushConfig,
SpamCheckerConfig,
+ RoomConfig,
GroupsConfig,
UserDirectoryConfig,
ConsentConfig,
diff --git a/synapse/config/oidc_config.py b/synapse/config/oidc_config.py
index e24dd637bc..e0939bce84 100644
--- a/synapse/config/oidc_config.py
+++ b/synapse/config/oidc_config.py
@@ -89,7 +89,7 @@ class OIDCConfig(Config):
# use an OpenID Connect Provider for authentication, instead of its internal
# password database.
#
- # See https://github.com/matrix-org/synapse/blob/master/openid.md.
+ # See https://github.com/matrix-org/synapse/blob/master/docs/openid.md.
#
oidc_config:
# Uncomment the following to enable authorization against an OpenID Connect
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index fecced2d57..6badf4e75d 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -18,8 +18,9 @@ from distutils.util import strtobool
import pkg_resources
+from synapse.api.constants import RoomCreationPreset
from synapse.config._base import Config, ConfigError
-from synapse.types import RoomAlias
+from synapse.types import RoomAlias, UserID
from synapse.util.stringutils import random_string_with_symbols
@@ -127,7 +128,50 @@ class RegistrationConfig(Config):
for room_alias in self.auto_join_rooms:
if not RoomAlias.is_valid(room_alias):
raise ConfigError("Invalid auto_join_rooms entry %s" % (room_alias,))
+
+ # Options for creating auto-join rooms if they do not exist yet.
self.autocreate_auto_join_rooms = config.get("autocreate_auto_join_rooms", True)
+ self.autocreate_auto_join_rooms_federated = config.get(
+ "autocreate_auto_join_rooms_federated", True
+ )
+ self.autocreate_auto_join_room_preset = (
+ config.get("autocreate_auto_join_room_preset")
+ or RoomCreationPreset.PUBLIC_CHAT
+ )
+ self.auto_join_room_requires_invite = self.autocreate_auto_join_room_preset in {
+ RoomCreationPreset.PRIVATE_CHAT,
+ RoomCreationPreset.TRUSTED_PRIVATE_CHAT,
+ }
+
+ # Pull the creater/inviter from the configuration, this gets used to
+ # send invites for invite-only rooms.
+ mxid_localpart = config.get("auto_join_mxid_localpart")
+ self.auto_join_user_id = None
+ if mxid_localpart:
+ # Convert the localpart to a full mxid.
+ self.auto_join_user_id = UserID(
+ mxid_localpart, self.server_name
+ ).to_string()
+
+ if self.autocreate_auto_join_rooms:
+ # Ensure the preset is a known value.
+ if self.autocreate_auto_join_room_preset not in {
+ RoomCreationPreset.PUBLIC_CHAT,
+ RoomCreationPreset.PRIVATE_CHAT,
+ RoomCreationPreset.TRUSTED_PRIVATE_CHAT,
+ }:
+ raise ConfigError("Invalid value for autocreate_auto_join_room_preset")
+ # If the preset requires invitations to be sent, ensure there's a
+ # configured user to send them from.
+ if self.auto_join_room_requires_invite:
+ if not mxid_localpart:
+ raise ConfigError(
+ "The configuration option `auto_join_mxid_localpart` is required if "
+ "`autocreate_auto_join_room_preset` is set to private_chat or trusted_private_chat, such that "
+ "Synapse knows who to send invitations from. Please "
+ "configure `auto_join_mxid_localpart`."
+ )
+
self.auto_join_rooms_for_guests = config.get("auto_join_rooms_for_guests", True)
self.enable_set_displayname = config.get("enable_set_displayname", True)
@@ -357,7 +401,11 @@ class RegistrationConfig(Config):
#enable_3pid_changes: false
# Users who register on this homeserver will automatically be joined
- # to these rooms
+ # to these rooms.
+ #
+ # By default, any room aliases included in this list will be created
+ # as a publicly joinable room when the first user registers for the
+ # homeserver. This behaviour can be customised with the settings below.
#
#auto_join_rooms:
# - "#example:example.com"
@@ -365,10 +413,62 @@ class RegistrationConfig(Config):
# Where auto_join_rooms are specified, setting this flag ensures that the
# the rooms exist by creating them when the first user on the
# homeserver registers.
+ #
+ # By default the auto-created rooms are publicly joinable from any federated
+ # server. Use the autocreate_auto_join_rooms_federated and
+ # autocreate_auto_join_room_preset settings below to customise this behaviour.
+ #
# Setting to false means that if the rooms are not manually created,
# users cannot be auto-joined since they do not exist.
#
- #autocreate_auto_join_rooms: true
+ # Defaults to true. Uncomment the following line to disable automatically
+ # creating auto-join rooms.
+ #
+ #autocreate_auto_join_rooms: false
+
+ # Whether the auto_join_rooms that are auto-created are available via
+ # federation. Only has an effect if autocreate_auto_join_rooms is true.
+ #
+ # Note that whether a room is federated cannot be modified after
+ # creation.
+ #
+ # Defaults to true: the room will be joinable from other servers.
+ # Uncomment the following to prevent users from other homeservers from
+ # joining these rooms.
+ #
+ #autocreate_auto_join_rooms_federated: false
+
+ # The room preset to use when auto-creating one of auto_join_rooms. Only has an
+ # effect if autocreate_auto_join_rooms is true.
+ #
+ # This can be one of "public_chat", "private_chat", or "trusted_private_chat".
+ # If a value of "private_chat" or "trusted_private_chat" is used then
+ # auto_join_mxid_localpart must also be configured.
+ #
+ # Defaults to "public_chat", meaning that the room is joinable by anyone, including
+ # federated servers if autocreate_auto_join_rooms_federated is true (the default).
+ # Uncomment the following to require an invitation to join these rooms.
+ #
+ #autocreate_auto_join_room_preset: private_chat
+
+ # The local part of the user id which is used to create auto_join_rooms if
+ # autocreate_auto_join_rooms is true. If this is not provided then the
+ # initial user account that registers will be used to create the rooms.
+ #
+ # The user id is also used to invite new users to any auto-join rooms which
+ # are set to invite-only.
+ #
+ # It *must* be configured if autocreate_auto_join_room_preset is set to
+ # "private_chat" or "trusted_private_chat".
+ #
+ # Note that this must be specified in order for new users to be correctly
+ # invited to any auto-join rooms which have been set to invite-only (either
+ # at the time of creation or subsequently).
+ #
+ # Note that, if the room already exists, this user must be joined and
+ # have the appropriate permissions to invite new members.
+ #
+ #auto_join_mxid_localpart: system
# When auto_join_rooms is specified, setting this flag to false prevents
# guest accounts from being automatically joined to the rooms.
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index b751d02d37..01009f3924 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -94,6 +94,12 @@ class ContentRepositoryConfig(Config):
else:
self.can_load_media_repo = True
+ # Whether this instance should be the one to run the background jobs to
+ # e.g clean up old URL previews.
+ self.media_instance_running_background_jobs = config.get(
+ "media_instance_running_background_jobs",
+ )
+
self.max_upload_size = self.parse_size(config.get("max_upload_size", "10M"))
self.max_image_pixels = self.parse_size(config.get("max_image_pixels", "32M"))
self.max_spider_size = self.parse_size(config.get("max_spider_size", "10M"))
diff --git a/synapse/config/room.py b/synapse/config/room.py
new file mode 100644
index 0000000000..6aa4de0672
--- /dev/null
+++ b/synapse/config/room.py
@@ -0,0 +1,80 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.api.constants import RoomCreationPreset
+
+from ._base import Config, ConfigError
+
+logger = logging.Logger(__name__)
+
+
+class RoomDefaultEncryptionTypes(object):
+ """Possible values for the encryption_enabled_by_default_for_room_type config option"""
+
+ ALL = "all"
+ INVITE = "invite"
+ OFF = "off"
+
+
+class RoomConfig(Config):
+ section = "room"
+
+ def read_config(self, config, **kwargs):
+ # Whether new, locally-created rooms should have encryption enabled
+ encryption_for_room_type = config.get(
+ "encryption_enabled_by_default_for_room_type",
+ RoomDefaultEncryptionTypes.OFF,
+ )
+ if encryption_for_room_type == RoomDefaultEncryptionTypes.ALL:
+ self.encryption_enabled_by_default_for_room_presets = [
+ RoomCreationPreset.PRIVATE_CHAT,
+ RoomCreationPreset.TRUSTED_PRIVATE_CHAT,
+ RoomCreationPreset.PUBLIC_CHAT,
+ ]
+ elif encryption_for_room_type == RoomDefaultEncryptionTypes.INVITE:
+ self.encryption_enabled_by_default_for_room_presets = [
+ RoomCreationPreset.PRIVATE_CHAT,
+ RoomCreationPreset.TRUSTED_PRIVATE_CHAT,
+ ]
+ elif encryption_for_room_type == RoomDefaultEncryptionTypes.OFF:
+ self.encryption_enabled_by_default_for_room_presets = []
+ else:
+ raise ConfigError(
+ "Invalid value for encryption_enabled_by_default_for_room_type"
+ )
+
+ def generate_config_section(self, **kwargs):
+ return """\
+ ## Rooms ##
+
+ # Controls whether locally-created rooms should be end-to-end encrypted by
+ # default.
+ #
+ # Possible options are "all", "invite", and "off". They are defined as:
+ #
+ # * "all": any locally-created room
+ # * "invite": any room created with the "private_chat" or "trusted_private_chat"
+ # room creation presets
+ # * "off": this option will take no effect
+ #
+ # The default value is "off".
+ #
+ # Note that this option will only affect rooms created after it is set. It
+ # will also not affect rooms created by other servers.
+ #
+ #encryption_enabled_by_default_for_room_type: invite
+ """
diff --git a/synapse/config/saml2_config.py b/synapse/config/saml2_config.py
index d0a19751e8..293643b2de 100644
--- a/synapse/config/saml2_config.py
+++ b/synapse/config/saml2_config.py
@@ -160,7 +160,7 @@ class SAML2Config(Config):
# session lifetime: in milliseconds
self.saml2_session_lifetime = self.parse_duration(
- saml2_config.get("saml_session_lifetime", "5m")
+ saml2_config.get("saml_session_lifetime", "15m")
)
template_dir = saml2_config.get("template_dir")
@@ -286,7 +286,7 @@ class SAML2Config(Config):
# The lifetime of a SAML session. This defines how long a user has to
# complete the authentication process, if allow_unsolicited is unset.
- # The default is 5 minutes.
+ # The default is 15 minutes.
#
#saml_session_lifetime: 5m
diff --git a/synapse/config/server.py b/synapse/config/server.py
index f57eefc99c..8204664883 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -19,7 +19,7 @@ import logging
import os.path
import re
from textwrap import indent
-from typing import Dict, List, Optional
+from typing import Any, Dict, Iterable, List, Optional
import attr
import yaml
@@ -57,6 +57,64 @@ on how to configure the new listener.
--------------------------------------------------------------------------------"""
+KNOWN_LISTENER_TYPES = {
+ "http",
+ "metrics",
+ "manhole",
+ "replication",
+}
+
+KNOWN_RESOURCES = {
+ "client",
+ "consent",
+ "federation",
+ "keys",
+ "media",
+ "metrics",
+ "openid",
+ "replication",
+ "static",
+ "webclient",
+}
+
+
+@attr.s(frozen=True)
+class HttpResourceConfig:
+ names = attr.ib(
+ type=List[str],
+ factory=list,
+ validator=attr.validators.deep_iterable(attr.validators.in_(KNOWN_RESOURCES)), # type: ignore
+ )
+ compress = attr.ib(
+ type=bool,
+ default=False,
+ validator=attr.validators.optional(attr.validators.instance_of(bool)), # type: ignore[arg-type]
+ )
+
+
+@attr.s(frozen=True)
+class HttpListenerConfig:
+ """Object describing the http-specific parts of the config of a listener"""
+
+ x_forwarded = attr.ib(type=bool, default=False)
+ resources = attr.ib(type=List[HttpResourceConfig], factory=list)
+ additional_resources = attr.ib(type=Dict[str, dict], factory=dict)
+ tag = attr.ib(type=str, default=None)
+
+
+@attr.s(frozen=True)
+class ListenerConfig:
+ """Object describing the configuration of a single listener."""
+
+ port = attr.ib(type=int, validator=attr.validators.instance_of(int))
+ bind_addresses = attr.ib(type=List[str])
+ type = attr.ib(type=str, validator=attr.validators.in_(KNOWN_LISTENER_TYPES))
+ tls = attr.ib(type=bool, default=False)
+
+ # http_options is only populated if type=http
+ http_options = attr.ib(type=Optional[HttpListenerConfig], default=None)
+
+
class ServerConfig(Config):
section = "server"
@@ -379,38 +437,21 @@ class ServerConfig(Config):
}
]
- self.listeners = [] # type: List[dict]
- for listener in config.get("listeners", []):
- if not isinstance(listener.get("port", None), int):
- raise ConfigError(
- "Listener configuration is lacking a valid 'port' option"
- )
+ self.listeners = [parse_listener_def(x) for x in config.get("listeners", [])]
- if listener.setdefault("tls", False):
- # no_tls is not really supported any more, but let's grandfather it in
- # here.
- if config.get("no_tls", False):
+ # no_tls is not really supported any more, but let's grandfather it in
+ # here.
+ if config.get("no_tls", False):
+ l2 = []
+ for listener in self.listeners:
+ if listener.tls:
logger.info(
- "Ignoring TLS-enabled listener on port %i due to no_tls"
+ "Ignoring TLS-enabled listener on port %i due to no_tls",
+ listener.port,
)
- continue
-
- bind_address = listener.pop("bind_address", None)
- bind_addresses = listener.setdefault("bind_addresses", [])
-
- # if bind_address was specified, add it to the list of addresses
- if bind_address:
- bind_addresses.append(bind_address)
-
- # if we still have an empty list of addresses, use the default list
- if not bind_addresses:
- if listener["type"] == "metrics":
- # the metrics listener doesn't support IPv6
- bind_addresses.append("0.0.0.0")
else:
- bind_addresses.extend(DEFAULT_BIND_ADDRESSES)
-
- self.listeners.append(listener)
+ l2.append(listener)
+ self.listeners = l2
if not self.web_client_location:
_warn_if_webclient_configured(self.listeners)
@@ -446,43 +487,41 @@ class ServerConfig(Config):
bind_host = config.get("bind_host", "")
gzip_responses = config.get("gzip_responses", True)
+ http_options = HttpListenerConfig(
+ resources=[
+ HttpResourceConfig(names=["client"], compress=gzip_responses),
+ HttpResourceConfig(names=["federation"]),
+ ],
+ )
+
self.listeners.append(
- {
- "port": bind_port,
- "bind_addresses": [bind_host],
- "tls": True,
- "type": "http",
- "resources": [
- {"names": ["client"], "compress": gzip_responses},
- {"names": ["federation"], "compress": False},
- ],
- }
+ ListenerConfig(
+ port=bind_port,
+ bind_addresses=[bind_host],
+ tls=True,
+ type="http",
+ http_options=http_options,
+ )
)
unsecure_port = config.get("unsecure_port", bind_port - 400)
if unsecure_port:
self.listeners.append(
- {
- "port": unsecure_port,
- "bind_addresses": [bind_host],
- "tls": False,
- "type": "http",
- "resources": [
- {"names": ["client"], "compress": gzip_responses},
- {"names": ["federation"], "compress": False},
- ],
- }
+ ListenerConfig(
+ port=unsecure_port,
+ bind_addresses=[bind_host],
+ tls=False,
+ type="http",
+ http_options=http_options,
+ )
)
manhole = config.get("manhole")
if manhole:
self.listeners.append(
- {
- "port": manhole,
- "bind_addresses": ["127.0.0.1"],
- "type": "manhole",
- "tls": False,
- }
+ ListenerConfig(
+ port=manhole, bind_addresses=["127.0.0.1"], type="manhole",
+ )
)
metrics_port = config.get("metrics_port")
@@ -490,13 +529,14 @@ class ServerConfig(Config):
logger.warning(METRICS_PORT_WARNING)
self.listeners.append(
- {
- "port": metrics_port,
- "bind_addresses": [config.get("metrics_bind_host", "127.0.0.1")],
- "tls": False,
- "type": "http",
- "resources": [{"names": ["metrics"], "compress": False}],
- }
+ ListenerConfig(
+ port=metrics_port,
+ bind_addresses=[config.get("metrics_bind_host", "127.0.0.1")],
+ type="http",
+ http_options=HttpListenerConfig(
+ resources=[HttpResourceConfig(names=["metrics"])]
+ ),
+ )
)
_check_resource_config(self.listeners)
@@ -522,7 +562,7 @@ class ServerConfig(Config):
)
def has_tls_listener(self) -> bool:
- return any(listener["tls"] for listener in self.listeners)
+ return any(listener.tls for listener in self.listeners)
def generate_config_section(
self, server_name, data_dir_path, open_private_ports, listeners, **kwargs
@@ -856,7 +896,7 @@ class ServerConfig(Config):
# number of monthly active users.
#
# 'limit_usage_by_mau' disables/enables monthly active user blocking. When
- # anabled and a limit is reached the server returns a 'ResourceLimitError'
+ # enabled and a limit is reached the server returns a 'ResourceLimitError'
# with error type Codes.RESOURCE_LIMIT_EXCEEDED
#
# 'max_mau_value' is the hard limit of monthly active users above which
@@ -1081,6 +1121,44 @@ def read_gc_thresholds(thresholds):
)
+def parse_listener_def(listener: Any) -> ListenerConfig:
+ """parse a listener config from the config file"""
+ listener_type = listener["type"]
+
+ port = listener.get("port")
+ if not isinstance(port, int):
+ raise ConfigError("Listener configuration is lacking a valid 'port' option")
+
+ tls = listener.get("tls", False)
+
+ bind_addresses = listener.get("bind_addresses", [])
+ bind_address = listener.get("bind_address")
+ # if bind_address was specified, add it to the list of addresses
+ if bind_address:
+ bind_addresses.append(bind_address)
+
+ # if we still have an empty list of addresses, use the default list
+ if not bind_addresses:
+ if listener_type == "metrics":
+ # the metrics listener doesn't support IPv6
+ bind_addresses.append("0.0.0.0")
+ else:
+ bind_addresses.extend(DEFAULT_BIND_ADDRESSES)
+
+ http_config = None
+ if listener_type == "http":
+ http_config = HttpListenerConfig(
+ x_forwarded=listener.get("x_forwarded", False),
+ resources=[
+ HttpResourceConfig(**res) for res in listener.get("resources", [])
+ ],
+ additional_resources=listener.get("additional_resources", {}),
+ tag=listener.get("tag"),
+ )
+
+ return ListenerConfig(port, bind_addresses, listener_type, tls, http_config)
+
+
NO_MORE_WEB_CLIENT_WARNING = """
Synapse no longer includes a web client. To enable a web client, configure
web_client_location. To remove this warning, remove 'webclient' from the 'listeners'
@@ -1088,40 +1166,27 @@ configuration.
"""
-def _warn_if_webclient_configured(listeners):
+def _warn_if_webclient_configured(listeners: Iterable[ListenerConfig]) -> None:
for listener in listeners:
- for res in listener.get("resources", []):
- for name in res.get("names", []):
+ if not listener.http_options:
+ continue
+ for res in listener.http_options.resources:
+ for name in res.names:
if name == "webclient":
logger.warning(NO_MORE_WEB_CLIENT_WARNING)
return
-KNOWN_RESOURCES = (
- "client",
- "consent",
- "federation",
- "keys",
- "media",
- "metrics",
- "openid",
- "replication",
- "static",
- "webclient",
-)
-
-
-def _check_resource_config(listeners):
+def _check_resource_config(listeners: Iterable[ListenerConfig]) -> None:
resource_names = {
res_name
for listener in listeners
- for res in listener.get("resources", [])
- for res_name in res.get("names", [])
+ if listener.http_options
+ for res in listener.http_options.resources
+ for res_name in res.names
}
for resource in resource_names:
- if resource not in KNOWN_RESOURCES:
- raise ConfigError("Unknown listener resource '%s'" % (resource,))
if resource == "consent":
try:
check_requirements("resources.consent")
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index a65538562b..e368ea564d 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -20,8 +20,6 @@ from datetime import datetime
from hashlib import sha256
from typing import List
-import six
-
from unpaddedbase64 import encode_base64
from OpenSSL import SSL, crypto
@@ -59,7 +57,7 @@ class TlsConfig(Config):
logger.warning(ACME_SUPPORT_ENABLED_WARN)
# hyperlink complains on py2 if this is not a Unicode
- self.acme_url = six.text_type(
+ self.acme_url = str(
acme_config.get("url", "https://acme-v01.api.letsencrypt.org/directory")
)
self.acme_port = acme_config.get("port", 80)
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index ed06b91a54..dbc661630c 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -16,6 +16,7 @@
import attr
from ._base import Config, ConfigError
+from .server import ListenerConfig, parse_listener_def
@attr.s
@@ -52,7 +53,9 @@ class WorkerConfig(Config):
if self.worker_app == "synapse.app.homeserver":
self.worker_app = None
- self.worker_listeners = config.get("worker_listeners", [])
+ self.worker_listeners = [
+ parse_listener_def(x) for x in config.get("worker_listeners", [])
+ ]
self.worker_daemonize = config.get("worker_daemonize")
self.worker_pid_file = config.get("worker_pid_file")
self.worker_log_config = config.get("worker_log_config")
@@ -75,24 +78,11 @@ class WorkerConfig(Config):
manhole = config.get("worker_manhole")
if manhole:
self.worker_listeners.append(
- {
- "port": manhole,
- "bind_addresses": ["127.0.0.1"],
- "type": "manhole",
- "tls": False,
- }
+ ListenerConfig(
+ port=manhole, bind_addresses=["127.0.0.1"], type="manhole",
+ )
)
- if self.worker_listeners:
- for listener in self.worker_listeners:
- bind_address = listener.pop("bind_address", None)
- bind_addresses = listener.setdefault("bind_addresses", [])
-
- if bind_address:
- bind_addresses.append(bind_address)
- elif not bind_addresses:
- bind_addresses.append("")
-
# A map from instance name to host/port of their HTTP replication endpoint.
instance_map = config.get("instance_map") or {}
self.instance_map = {
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index a9f4025bfe..dbfc3e8972 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -15,11 +15,9 @@
# limitations under the License.
import logging
+import urllib
from collections import defaultdict
-import six
-from six.moves import urllib
-
import attr
from signedjson.key import (
decode_verify_key_bytes,
@@ -661,7 +659,7 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
for response in query_response["server_keys"]:
# do this first, so that we can give useful errors thereafter
server_name = response.get("server_name")
- if not isinstance(server_name, six.string_types):
+ if not isinstance(server_name, str):
raise KeyLookupError(
"Malformed response from key notary server %s: invalid server_name"
% (perspective_name,)
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 533ba327f5..cc5deca75b 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -20,8 +20,6 @@ import os
from distutils.util import strtobool
from typing import Dict, Optional, Type
-import six
-
from unpaddedbase64 import encode_base64
from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
@@ -290,7 +288,7 @@ class EventBase(metaclass=abc.ABCMeta):
return list(self._dict.items())
def keys(self):
- return six.iterkeys(self._dict)
+ return self._dict.keys()
def prev_event_ids(self):
"""Returns the list of prev event IDs. The order matches the order
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index 7c5f620d09..f94cdcbaba 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -14,8 +14,6 @@
# limitations under the License.
from typing import Optional, Union
-from six import iteritems
-
import attr
from frozendict import frozendict
@@ -341,7 +339,7 @@ def _encode_state_dict(state_dict):
if state_dict is None:
return None
- return [(etype, state_key, v) for (etype, state_key), v in iteritems(state_dict)]
+ return [(etype, state_key, v) for (etype, state_key), v in state_dict.items()]
def _decode_state_dict(input):
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index dd340be9a7..f6b507977f 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -16,8 +16,6 @@ import collections
import re
from typing import Any, Mapping, Union
-from six import string_types
-
from frozendict import frozendict
from twisted.internet import defer
@@ -318,7 +316,7 @@ def serialize_event(
if only_event_fields:
if not isinstance(only_event_fields, list) or not all(
- isinstance(f, string_types) for f in only_event_fields
+ isinstance(f, str) for f in only_event_fields
):
raise TypeError("only_event_fields must be a list of strings")
d = only_fields(d, only_event_fields)
diff --git a/synapse/events/validator.py b/synapse/events/validator.py
index b001c64bb4..588d222f36 100644
--- a/synapse/events/validator.py
+++ b/synapse/events/validator.py
@@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from six import integer_types, string_types
-
from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes, Membership
from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import EventFormatVersions
@@ -53,7 +51,7 @@ class EventValidator(object):
event_strings = ["origin"]
for s in event_strings:
- if not isinstance(getattr(event, s), string_types):
+ if not isinstance(getattr(event, s), str):
raise SynapseError(400, "'%s' not a string type" % (s,))
# Depending on the room version, ensure the data is spec compliant JSON.
@@ -90,7 +88,7 @@ class EventValidator(object):
max_lifetime = event.content.get("max_lifetime")
if min_lifetime is not None:
- if not isinstance(min_lifetime, integer_types):
+ if not isinstance(min_lifetime, int):
raise SynapseError(
code=400,
msg="'min_lifetime' must be an integer",
@@ -124,7 +122,7 @@ class EventValidator(object):
)
if max_lifetime is not None:
- if not isinstance(max_lifetime, integer_types):
+ if not isinstance(max_lifetime, int):
raise SynapseError(
code=400,
msg="'max_lifetime' must be an integer",
@@ -183,7 +181,7 @@ class EventValidator(object):
strings.append("state_key")
for s in strings:
- if not isinstance(getattr(event, s), string_types):
+ if not isinstance(getattr(event, s), str):
raise SynapseError(400, "Not '%s' a string type" % (s,))
RoomID.from_string(event.room_id)
@@ -223,7 +221,7 @@ class EventValidator(object):
for s in keys:
if s not in d:
raise SynapseError(400, "'%s' not in content" % (s,))
- if not isinstance(d[s], string_types):
+ if not isinstance(d[s], str):
raise SynapseError(400, "'%s' not a string type" % (s,))
def _ensure_state_event(self, event):
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index c0012c6872..420df2385f 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -17,8 +17,6 @@ import logging
from collections import namedtuple
from typing import Iterable, List
-import six
-
from twisted.internet import defer
from twisted.internet.defer import Deferred, DeferredList
from twisted.python.failure import Failure
@@ -93,8 +91,8 @@ class FederationBase(object):
# *actual* redacted copy to be on the safe side.)
redacted_event = prune_event(pdu)
if set(redacted_event.keys()) == set(pdu.keys()) and set(
- six.iterkeys(redacted_event.content)
- ) == set(six.iterkeys(pdu.content)):
+ redacted_event.content.keys()
+ ) == set(pdu.content.keys()):
logger.info(
"Event %s seems to have been redacted; using our redacted "
"copy",
@@ -294,7 +292,7 @@ def event_from_pdu_json(
assert_params_in_dict(pdu_json, ("type", "depth"))
depth = pdu_json["depth"]
- if not isinstance(depth, six.integer_types):
+ if not isinstance(depth, int):
raise SynapseError(400, "Depth %r not an intger" % (depth,), Codes.BAD_JSON)
if depth < 0:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 32a8a2ee46..e704cf2f44 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -17,11 +17,8 @@
import logging
from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union
-import six
-from six import iteritems
-
from canonicaljson import json
-from prometheus_client import Counter
+from prometheus_client import Counter, Histogram
from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
@@ -73,6 +70,10 @@ received_queries_counter = Counter(
"synapse_federation_server_received_queries", "", ["type"]
)
+pdu_process_time = Histogram(
+ "synapse_federation_server_pdu_process_time", "Time taken to process an event",
+)
+
class FederationServer(FederationBase):
def __init__(self, hs):
@@ -274,21 +275,22 @@ class FederationServer(FederationBase):
for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
- with nested_logging_context(event_id):
- try:
- await self._handle_received_pdu(origin, pdu)
- pdu_results[event_id] = {}
- except FederationError as e:
- logger.warning("Error handling PDU %s: %s", event_id, e)
- pdu_results[event_id] = {"error": str(e)}
- except Exception as e:
- f = failure.Failure()
- pdu_results[event_id] = {"error": str(e)}
- logger.error(
- "Failed to handle PDU %s",
- event_id,
- exc_info=(f.type, f.value, f.getTracebackObject()),
- )
+ with pdu_process_time.time():
+ with nested_logging_context(event_id):
+ try:
+ await self._handle_received_pdu(origin, pdu)
+ pdu_results[event_id] = {}
+ except FederationError as e:
+ logger.warning("Error handling PDU %s: %s", event_id, e)
+ pdu_results[event_id] = {"error": str(e)}
+ except Exception as e:
+ f = failure.Failure()
+ pdu_results[event_id] = {"error": str(e)}
+ logger.error(
+ "Failed to handle PDU %s",
+ event_id,
+ exc_info=(f.type, f.value, f.getTracebackObject()),
+ )
await concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
@@ -534,9 +536,9 @@ class FederationServer(FederationBase):
",".join(
(
"%s for %s:%s" % (key_id, user_id, device_id)
- for user_id, user_keys in iteritems(json_result)
- for device_id, device_keys in iteritems(user_keys)
- for key_id, _ in iteritems(device_keys)
+ for user_id, user_keys in json_result.items()
+ for device_id, device_keys in user_keys.items()
+ for key_id, _ in device_keys.items()
)
),
)
@@ -752,7 +754,7 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool:
def _acl_entry_matches(server_name: str, acl_entry: str) -> Match:
- if not isinstance(acl_entry, six.string_types):
+ if not isinstance(acl_entry, str):
logger.warning(
"Ignoring non-str ACL entry '%s' (is %s)", acl_entry, type(acl_entry)
)
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 52f4f54215..6bbd762681 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -33,8 +33,6 @@ import logging
from collections import namedtuple
from typing import Dict, List, Tuple, Type
-from six import iteritems
-
from sortedcontainers import SortedDict
from twisted.internet import defer
@@ -327,7 +325,7 @@ class FederationRemoteSendQueue(object):
# stream position.
keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
- for ((destination, edu_key), pos) in iteritems(keyed_edus):
+ for ((destination, edu_key), pos) in keyed_edus.items():
rows.append(
(
pos,
@@ -530,10 +528,10 @@ def process_rows_for_federation(transaction_queue, rows):
states=[state], destinations=destinations
)
- for destination, edu_map in iteritems(buff.keyed_edus):
+ for destination, edu_map in buff.keyed_edus.items():
for key, edu in edu_map.items():
transaction_queue.send_edu(edu, key)
- for destination, edu_list in iteritems(buff.edus):
+ for destination, edu_list in buff.edus.items():
for edu in edu_list:
transaction_queue.send_edu(edu, None)
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index d473576902..464d7a41de 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -16,8 +16,6 @@
import logging
from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple
-from six import itervalues
-
from prometheus_client import Counter
from twisted.internet import defer
@@ -203,7 +201,15 @@ class FederationSender(object):
logger.debug("Sending %s to %r", event, destinations)
- self._send_pdu(event, destinations)
+ if destinations:
+ self._send_pdu(event, destinations)
+
+ now = self.clock.time_msec()
+ ts = await self.store.get_received_ts(event.event_id)
+
+ synapse.metrics.event_processing_lag_by_event.labels(
+ "federation_sender"
+ ).observe((now - ts) / 1000)
async def handle_room_events(events: Iterable[EventBase]) -> None:
with Measure(self.clock, "handle_room_events"):
@@ -218,7 +224,7 @@ class FederationSender(object):
defer.gatherResults(
[
run_in_background(handle_room_events, evs)
- for evs in itervalues(events_by_room)
+ for evs in events_by_room.values()
],
consumeErrors=True,
)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 060bf07197..9f99311419 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -15,10 +15,9 @@
# limitations under the License.
import logging
+import urllib
from typing import Any, Dict, Optional
-from six.moves import urllib
-
from twisted.internet import defer
from synapse.api.constants import Membership
diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py
index 8a9de913b3..8db8ab1b7b 100644
--- a/synapse/groups/groups_server.py
+++ b/synapse/groups/groups_server.py
@@ -17,8 +17,6 @@
import logging
-from six import string_types
-
from synapse.api.errors import Codes, SynapseError
from synapse.types import GroupID, RoomID, UserID, get_domain_from_id
from synapse.util.async_helpers import concurrently_execute
@@ -513,7 +511,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
for keyname in ("name", "avatar_url", "short_description", "long_description"):
if keyname in content:
value = content[keyname]
- if not isinstance(value, string_types):
+ if not isinstance(value, str):
raise SynapseError(400, "%r value is not a string" % (keyname,))
profile[keyname] = value
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index fe62f78e67..904c96eeec 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -15,8 +15,6 @@
import logging
-from six import itervalues
-
from prometheus_client import Counter
from twisted.internet import defer
@@ -116,6 +114,12 @@ class ApplicationServicesHandler(object):
for service in services:
self.scheduler.submit_event_for_as(service, event)
+ now = self.clock.time_msec()
+ ts = yield self.store.get_received_ts(event.event_id)
+ synapse.metrics.event_processing_lag_by_event.labels(
+ "appservice_sender"
+ ).observe((now - ts) / 1000)
+
@defer.inlineCallbacks
def handle_room_events(events):
for event in events:
@@ -125,7 +129,7 @@ class ApplicationServicesHandler(object):
defer.gatherResults(
[
run_in_background(handle_room_events, evs)
- for evs in itervalues(events_by_room)
+ for evs in events_by_room.values()
],
consumeErrors=True,
)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index bb3b43d5ae..c3f86e7414 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -297,7 +297,7 @@ class AuthHandler(BaseHandler):
# Convert the URI and method to strings.
uri = request.uri.decode("utf-8")
- method = request.uri.decode("utf-8")
+ method = request.method.decode("utf-8")
# If there's no session ID, create a new session.
if not sid:
diff --git a/synapse/handlers/cas_handler.py b/synapse/handlers/cas_handler.py
index 64aaa1335c..76f213723a 100644
--- a/synapse/handlers/cas_handler.py
+++ b/synapse/handlers/cas_handler.py
@@ -14,11 +14,10 @@
# limitations under the License.
import logging
+import urllib
import xml.etree.ElementTree as ET
from typing import Dict, Optional, Tuple
-from six.moves import urllib
-
from twisted.web.client import PartialDownloadError
from synapse.api.errors import Codes, LoginError
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 230d170258..31346b56c3 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -17,8 +17,6 @@
import logging
from typing import Any, Dict, Optional
-from six import iteritems, itervalues
-
from twisted.internet import defer
from synapse.api import errors
@@ -159,7 +157,7 @@ class DeviceWorkerHandler(BaseHandler):
# The user may have left the room
# TODO: Check if they actually did or if we were just invited.
if room_id not in room_ids:
- for key, event_id in iteritems(current_state_ids):
+ for key, event_id in current_state_ids.items():
etype, state_key = key
if etype != EventTypes.Member:
continue
@@ -182,7 +180,7 @@ class DeviceWorkerHandler(BaseHandler):
log_kv(
{"event": "encountered empty previous state", "room_id": room_id}
)
- for key, event_id in iteritems(current_state_ids):
+ for key, event_id in current_state_ids.items():
etype, state_key = key
if etype != EventTypes.Member:
continue
@@ -198,10 +196,10 @@ class DeviceWorkerHandler(BaseHandler):
# Check if we've joined the room? If so we just blindly add all the users to
# the "possibly changed" users.
- for state_dict in itervalues(prev_state_ids):
+ for state_dict in prev_state_ids.values():
member_event = state_dict.get((EventTypes.Member, user_id), None)
if not member_event or member_event != current_member_id:
- for key, event_id in iteritems(current_state_ids):
+ for key, event_id in current_state_ids.items():
etype, state_key = key
if etype != EventTypes.Member:
continue
@@ -211,14 +209,14 @@ class DeviceWorkerHandler(BaseHandler):
# If there has been any change in membership, include them in the
# possibly changed list. We'll check if they are joined below,
# and we're not toooo worried about spuriously adding users.
- for key, event_id in iteritems(current_state_ids):
+ for key, event_id in current_state_ids.items():
etype, state_key = key
if etype != EventTypes.Member:
continue
# check if this member has changed since any of the extremities
# at the stream_ordering, and add them to the list if so.
- for state_dict in itervalues(prev_state_ids):
+ for state_dict in prev_state_ids.values():
prev_event_id = state_dict.get(key, None)
if not prev_event_id or prev_event_id != event_id:
if state_key != user_id:
@@ -693,6 +691,7 @@ class DeviceListUpdater(object):
return False
+ @trace
@defer.inlineCallbacks
def _maybe_retry_device_resync(self):
"""Retry to resync device lists that are out of sync, except if another retry is
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 05c4b3eec0..610b08d00b 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -18,8 +18,6 @@ from typing import Any, Dict
from canonicaljson import json
-from twisted.internet import defer
-
from synapse.api.errors import SynapseError
from synapse.logging.context import run_in_background
from synapse.logging.opentracing import (
@@ -51,8 +49,7 @@ class DeviceMessageHandler(object):
self._device_list_updater = hs.get_device_handler().device_list_updater
- @defer.inlineCallbacks
- def on_direct_to_device_edu(self, origin, content):
+ async def on_direct_to_device_edu(self, origin, content):
local_messages = {}
sender_user_id = content["sender"]
if origin != get_domain_from_id(sender_user_id):
@@ -82,11 +79,11 @@ class DeviceMessageHandler(object):
}
local_messages[user_id] = messages_by_device
- yield self._check_for_unknown_devices(
+ await self._check_for_unknown_devices(
message_type, sender_user_id, by_device
)
- stream_id = yield self.store.add_messages_from_remote_to_device_inbox(
+ stream_id = await self.store.add_messages_from_remote_to_device_inbox(
origin, message_id, local_messages
)
@@ -94,14 +91,13 @@ class DeviceMessageHandler(object):
"to_device_key", stream_id, users=local_messages.keys()
)
- @defer.inlineCallbacks
- def _check_for_unknown_devices(
+ async def _check_for_unknown_devices(
self,
message_type: str,
sender_user_id: str,
by_device: Dict[str, Dict[str, Any]],
):
- """Checks inbound device messages for unkown remote devices, and if
+ """Checks inbound device messages for unknown remote devices, and if
found marks the remote cache for the user as stale.
"""
@@ -115,7 +111,7 @@ class DeviceMessageHandler(object):
requesting_device_ids.add(device_id)
# Check if we are tracking the devices of the remote user.
- room_ids = yield self.store.get_rooms_for_user(sender_user_id)
+ room_ids = await self.store.get_rooms_for_user(sender_user_id)
if not room_ids:
logger.info(
"Received device message from remote device we don't"
@@ -127,7 +123,7 @@ class DeviceMessageHandler(object):
# If we are tracking check that we know about the sending
# devices.
- cached_devices = yield self.store.get_cached_devices_for_user(sender_user_id)
+ cached_devices = await self.store.get_cached_devices_for_user(sender_user_id)
unknown_devices = requesting_device_ids - set(cached_devices)
if unknown_devices:
@@ -136,15 +132,14 @@ class DeviceMessageHandler(object):
sender_user_id,
unknown_devices,
)
- yield self.store.mark_remote_user_device_cache_as_stale(sender_user_id)
+ await self.store.mark_remote_user_device_cache_as_stale(sender_user_id)
# Immediately attempt a resync in the background
run_in_background(
self._device_list_updater.user_device_resync, sender_user_id
)
- @defer.inlineCallbacks
- def send_device_message(self, sender_user_id, message_type, messages):
+ async def send_device_message(self, sender_user_id, message_type, messages):
set_tag("number_of_messages", len(messages))
set_tag("sender", sender_user_id)
local_messages = {}
@@ -183,7 +178,7 @@ class DeviceMessageHandler(object):
}
log_kv({"local_messages": local_messages})
- stream_id = yield self.store.add_messages_to_device_inbox(
+ stream_id = await self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents
)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index f2f16b1e43..79a2df6201 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -17,8 +17,6 @@ import logging
import string
from typing import Iterable, List, Optional
-from twisted.internet import defer
-
from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes
from synapse.api.errors import (
AuthError,
@@ -55,8 +53,7 @@ class DirectoryHandler(BaseHandler):
self.spam_checker = hs.get_spam_checker()
- @defer.inlineCallbacks
- def _create_association(
+ async def _create_association(
self,
room_alias: RoomAlias,
room_id: str,
@@ -76,13 +73,13 @@ class DirectoryHandler(BaseHandler):
# TODO(erikj): Add transactions.
# TODO(erikj): Check if there is a current association.
if not servers:
- users = yield self.state.get_current_users_in_room(room_id)
+ users = await self.state.get_current_users_in_room(room_id)
servers = {get_domain_from_id(u) for u in users}
if not servers:
raise SynapseError(400, "Failed to get server list")
- yield self.store.create_room_alias_association(
+ await self.store.create_room_alias_association(
room_alias, room_id, servers, creator=creator
)
@@ -93,7 +90,7 @@ class DirectoryHandler(BaseHandler):
room_id: str,
servers: Optional[List[str]] = None,
check_membership: bool = True,
- ):
+ ) -> None:
"""Attempt to create a new alias
Args:
@@ -103,9 +100,6 @@ class DirectoryHandler(BaseHandler):
servers: Iterable of servers that others servers should try and join via
check_membership: Whether to check if the user is in the room
before the alias can be set (if the server's config requires it).
-
- Returns:
- Deferred
"""
user_id = requester.user.to_string()
@@ -148,7 +142,7 @@ class DirectoryHandler(BaseHandler):
# per alias creation rule?
raise SynapseError(403, "Not allowed to create alias")
- can_create = await self.can_modify_alias(room_alias, user_id=user_id)
+ can_create = self.can_modify_alias(room_alias, user_id=user_id)
if not can_create:
raise AuthError(
400,
@@ -158,7 +152,9 @@ class DirectoryHandler(BaseHandler):
await self._create_association(room_alias, room_id, servers, creator=user_id)
- async def delete_association(self, requester: Requester, room_alias: RoomAlias):
+ async def delete_association(
+ self, requester: Requester, room_alias: RoomAlias
+ ) -> str:
"""Remove an alias from the directory
(this is only meant for human users; AS users should call
@@ -169,7 +165,7 @@ class DirectoryHandler(BaseHandler):
room_alias
Returns:
- Deferred[unicode]: room id that the alias used to point to
+ room id that the alias used to point to
Raises:
NotFoundError: if the alias doesn't exist
@@ -191,7 +187,7 @@ class DirectoryHandler(BaseHandler):
if not can_delete:
raise AuthError(403, "You don't have permission to delete the alias.")
- can_delete = await self.can_modify_alias(room_alias, user_id=user_id)
+ can_delete = self.can_modify_alias(room_alias, user_id=user_id)
if not can_delete:
raise SynapseError(
400,
@@ -208,8 +204,7 @@ class DirectoryHandler(BaseHandler):
return room_id
- @defer.inlineCallbacks
- def delete_appservice_association(
+ async def delete_appservice_association(
self, service: ApplicationService, room_alias: RoomAlias
):
if not service.is_interested_in_alias(room_alias.to_string()):
@@ -218,29 +213,27 @@ class DirectoryHandler(BaseHandler):
"This application service has not reserved this kind of alias",
errcode=Codes.EXCLUSIVE,
)
- yield self._delete_association(room_alias)
+ await self._delete_association(room_alias)
- @defer.inlineCallbacks
- def _delete_association(self, room_alias: RoomAlias):
+ async def _delete_association(self, room_alias: RoomAlias):
if not self.hs.is_mine(room_alias):
raise SynapseError(400, "Room alias must be local")
- room_id = yield self.store.delete_room_alias(room_alias)
+ room_id = await self.store.delete_room_alias(room_alias)
return room_id
- @defer.inlineCallbacks
- def get_association(self, room_alias: RoomAlias):
+ async def get_association(self, room_alias: RoomAlias):
room_id = None
if self.hs.is_mine(room_alias):
- result = yield self.get_association_from_room_alias(room_alias)
+ result = await self.get_association_from_room_alias(room_alias)
if result:
room_id = result.room_id
servers = result.servers
else:
try:
- result = yield self.federation.make_query(
+ result = await self.federation.make_query(
destination=room_alias.domain,
query_type="directory",
args={"room_alias": room_alias.to_string()},
@@ -265,7 +258,7 @@ class DirectoryHandler(BaseHandler):
Codes.NOT_FOUND,
)
- users = yield self.state.get_current_users_in_room(room_id)
+ users = await self.state.get_current_users_in_room(room_id)
extra_servers = {get_domain_from_id(u) for u in users}
servers = set(extra_servers) | set(servers)
@@ -277,13 +270,12 @@ class DirectoryHandler(BaseHandler):
return {"room_id": room_id, "servers": servers}
- @defer.inlineCallbacks
- def on_directory_query(self, args):
+ async def on_directory_query(self, args):
room_alias = RoomAlias.from_string(args["room_alias"])
if not self.hs.is_mine(room_alias):
raise SynapseError(400, "Room Alias is not hosted on this homeserver")
- result = yield self.get_association_from_room_alias(room_alias)
+ result = await self.get_association_from_room_alias(room_alias)
if result is not None:
return {"room_id": result.room_id, "servers": result.servers}
@@ -344,16 +336,15 @@ class DirectoryHandler(BaseHandler):
ratelimit=False,
)
- @defer.inlineCallbacks
- def get_association_from_room_alias(self, room_alias: RoomAlias):
- result = yield self.store.get_association_from_room_alias(room_alias)
+ async def get_association_from_room_alias(self, room_alias: RoomAlias):
+ result = await self.store.get_association_from_room_alias(room_alias)
if not result:
# Query AS to see if it exists
as_handler = self.appservice_handler
- result = yield as_handler.query_room_alias_exists(room_alias)
+ result = await as_handler.query_room_alias_exists(room_alias)
return result
- def can_modify_alias(self, alias: RoomAlias, user_id: Optional[str] = None):
+ def can_modify_alias(self, alias: RoomAlias, user_id: Optional[str] = None) -> bool:
# Any application service "interested" in an alias they are regexing on
# can modify the alias.
# Users can only modify the alias if ALL the interested services have
@@ -366,12 +357,12 @@ class DirectoryHandler(BaseHandler):
for service in interested_services:
if user_id == service.sender:
# this user IS the app service so they can do whatever they like
- return defer.succeed(True)
+ return True
elif service.is_exclusive_alias(alias.to_string()):
# another service has an exclusive lock on this alias.
- return defer.succeed(False)
+ return False
# either no interested services, or no service with an exclusive lock
- return defer.succeed(True)
+ return True
async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str):
"""Determine whether a user can delete an alias.
@@ -459,8 +450,7 @@ class DirectoryHandler(BaseHandler):
await self.store.set_room_is_public(room_id, making_public)
- @defer.inlineCallbacks
- def edit_published_appservice_room_list(
+ async def edit_published_appservice_room_list(
self, appservice_id: str, network_id: str, room_id: str, visibility: str
):
"""Add or remove a room from the appservice/network specific public
@@ -475,7 +465,7 @@ class DirectoryHandler(BaseHandler):
if visibility not in ["public", "private"]:
raise SynapseError(400, "Invalid visibility setting")
- yield self.store.set_room_is_public_appservice(
+ await self.store.set_room_is_public_appservice(
room_id, appservice_id, network_id, visibility == "public"
)
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 774a252619..a7e60cbc26 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -17,8 +17,6 @@
import logging
-from six import iteritems
-
import attr
from canonicaljson import encode_canonical_json, json
from signedjson.key import decode_verify_key_bytes
@@ -135,7 +133,7 @@ class E2eKeysHandler(object):
remote_queries_not_in_cache = {}
if remote_queries:
query_list = []
- for user_id, device_ids in iteritems(remote_queries):
+ for user_id, device_ids in remote_queries.items():
if device_ids:
query_list.extend((user_id, device_id) for device_id in device_ids)
else:
@@ -145,9 +143,9 @@ class E2eKeysHandler(object):
user_ids_not_in_cache,
remote_results,
) = yield self.store.get_user_devices_from_cache(query_list)
- for user_id, devices in iteritems(remote_results):
+ for user_id, devices in remote_results.items():
user_devices = results.setdefault(user_id, {})
- for device_id, device in iteritems(devices):
+ for device_id, device in devices.items():
keys = device.get("keys", None)
device_display_name = device.get("device_display_name", None)
if keys:
@@ -446,9 +444,9 @@ class E2eKeysHandler(object):
",".join(
(
"%s for %s:%s" % (key_id, user_id, device_id)
- for user_id, user_keys in iteritems(json_result)
- for device_id, device_keys in iteritems(user_keys)
- for key_id, _ in iteritems(device_keys)
+ for user_id, user_keys in json_result.items()
+ for device_id, device_keys in user_keys.items()
+ for key_id, _ in device_keys.items()
)
),
)
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 9abaf13b8f..f55470a707 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -16,8 +16,6 @@
import logging
-from six import iteritems
-
from twisted.internet import defer
from synapse.api.errors import (
@@ -205,8 +203,8 @@ class E2eRoomKeysHandler(object):
)
to_insert = [] # batch the inserts together
changed = False # if anything has changed, we need to update the etag
- for room_id, room in iteritems(room_keys["rooms"]):
- for session_id, room_key in iteritems(room["sessions"]):
+ for room_id, room in room_keys["rooms"].items():
+ for session_id, room_key in room["sessions"].items():
if not isinstance(room_key["is_verified"], bool):
msg = (
"is_verified must be a boolean in keys for session %s in"
@@ -351,6 +349,7 @@ class E2eRoomKeysHandler(object):
raise
res["count"] = yield self.store.count_e2e_room_keys(user_id, res["version"])
+ res["etag"] = str(res["etag"])
return res
@trace
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d0b62f4cf2..4dbd8e1d98 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -19,12 +19,9 @@
import itertools
import logging
+from http import HTTPStatus
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
-import six
-from six import iteritems, itervalues
-from six.moves import http_client, zip
-
import attr
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
@@ -33,7 +30,12 @@ from unpaddedbase64 import decode_base64
from twisted.internet import defer
from synapse import event_auth
-from synapse.api.constants import EventTypes, Membership, RejectedReason
+from synapse.api.constants import (
+ EventTypes,
+ Membership,
+ RejectedReason,
+ RoomEncryptionAlgorithms,
+)
from synapse.api.errors import (
AuthError,
CodeMessageException,
@@ -374,6 +376,7 @@ class FederationHandler(BaseHandler):
room_version = await self.store.get_room_version_id(room_id)
state_map = await resolve_events_with_store(
+ self.clock,
room_id,
room_version,
state_maps,
@@ -393,7 +396,7 @@ class FederationHandler(BaseHandler):
)
event_map.update(evs)
- state = [event_map[e] for e in six.itervalues(state_map)]
+ state = [event_map[e] for e in state_map.values()]
except Exception:
logger.warning(
"[%s %s] Error attempting to resolve state at missing "
@@ -742,7 +745,10 @@ class FederationHandler(BaseHandler):
if device:
keys = device.get("keys", {}).get("keys", {})
- if event.content.get("algorithm") == "m.megolm.v1.aes-sha2":
+ if (
+ event.content.get("algorithm")
+ == RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2
+ ):
# For this algorithm we expect a curve25519 key.
key_name = "curve25519:%s" % (device_id,)
current_keys = [keys.get(key_name)]
@@ -1001,7 +1007,7 @@ class FederationHandler(BaseHandler):
"""
joined_users = [
(state_key, int(event.depth))
- for (e_type, state_key), event in iteritems(state)
+ for (e_type, state_key), event in state.items()
if e_type == EventTypes.Member and event.membership == Membership.JOIN
]
@@ -1091,16 +1097,16 @@ class FederationHandler(BaseHandler):
states = dict(zip(event_ids, [s.state for s in states]))
state_map = await self.store.get_events(
- [e_id for ids in itervalues(states) for e_id in itervalues(ids)],
+ [e_id for ids in states.values() for e_id in ids.values()],
get_prev_content=False,
)
states = {
key: {
k: state_map[e_id]
- for k, e_id in iteritems(state_dict)
+ for k, e_id in state_dict.items()
if e_id in state_map
}
- for key, state_dict in iteritems(states)
+ for key, state_dict in states.items()
}
for e_id, _ in sorted_extremeties_tuple:
@@ -1188,7 +1194,7 @@ class FederationHandler(BaseHandler):
ev.event_id,
len(ev.prev_event_ids()),
)
- raise SynapseError(http_client.BAD_REQUEST, "Too many prev_events")
+ raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many prev_events")
if len(ev.auth_event_ids()) > 10:
logger.warning(
@@ -1196,7 +1202,7 @@ class FederationHandler(BaseHandler):
ev.event_id,
len(ev.auth_event_ids()),
)
- raise SynapseError(http_client.BAD_REQUEST, "Too many auth_events")
+ raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
async def send_invite(self, target_host, event):
""" Sends the invite to the remote server for signing.
@@ -1539,7 +1545,7 @@ class FederationHandler(BaseHandler):
# block any attempts to invite the server notices mxid
if event.state_key == self._server_notices_mxid:
- raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user")
+ raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user")
# keep a record of the room version, if we don't yet know it.
# (this may get overwritten if we later get a different room version in a
@@ -1725,7 +1731,7 @@ class FederationHandler(BaseHandler):
state_groups = await self.state_store.get_state_groups(room_id, [event_id])
if state_groups:
- _, state = list(iteritems(state_groups)).pop()
+ _, state = list(state_groups.items()).pop()
results = {(e.type, e.state_key): e for e in state}
if event.is_state():
@@ -2088,7 +2094,7 @@ class FederationHandler(BaseHandler):
room_version, state_sets, event
)
current_state_ids = {
- k: e.event_id for k, e in iteritems(current_state_ids)
+ k: e.event_id for k, e in current_state_ids.items()
}
else:
current_state_ids = await self.state_handler.get_current_state_ids(
@@ -2104,7 +2110,7 @@ class FederationHandler(BaseHandler):
# Now check if event pass auth against said current state
auth_types = auth_types_for_event(event)
current_state_ids = [
- e for k, e in iteritems(current_state_ids) if k in auth_types
+ e for k, e in current_state_ids.items() if k in auth_types
]
current_auth_events = await self.store.get_events(current_state_ids)
@@ -2420,7 +2426,7 @@ class FederationHandler(BaseHandler):
else:
event_key = None
state_updates = {
- k: a.event_id for k, a in iteritems(auth_events) if k != event_key
+ k: a.event_id for k, a in auth_events.items() if k != event_key
}
current_state_ids = await context.get_current_state_ids()
@@ -2431,7 +2437,7 @@ class FederationHandler(BaseHandler):
prev_state_ids = await context.get_prev_state_ids()
prev_state_ids = dict(prev_state_ids)
- prev_state_ids.update({k: a.event_id for k, a in iteritems(auth_events)})
+ prev_state_ids.update({k: a.event_id for k, a in auth_events.items()})
# create a new state group as a delta from the existing one.
prev_group = context.state_group
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index ebe8d25bd8..7cb106e365 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -16,8 +16,6 @@
import logging
-from six import iteritems
-
from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
from synapse.types import get_domain_from_id
@@ -227,7 +225,7 @@ class GroupsLocalWorkerHandler(object):
results = {}
failed_results = []
- for destination, dest_user_ids in iteritems(destinations):
+ for destination, dest_user_ids in destinations.items():
try:
r = await self.transport_client.bulk_get_publicised_groups(
destination, list(dest_user_ids)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 649ca1f08a..665ad19b5d 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -17,8 +17,6 @@
import logging
from typing import Optional, Tuple
-from six import iteritems, itervalues, string_types
-
from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
@@ -246,7 +244,7 @@ class MessageHandler(object):
"avatar_url": profile.avatar_url,
"display_name": profile.display_name,
}
- for user_id, profile in iteritems(users_with_profile)
+ for user_id, profile in users_with_profile.items()
}
def maybe_schedule_expiry(self, event):
@@ -715,7 +713,7 @@ class EventCreationHandler(object):
spam_error = self.spam_checker.check_event_for_spam(event)
if spam_error:
- if not isinstance(spam_error, string_types):
+ if not isinstance(spam_error, str):
spam_error = "Spam is not permitted here"
raise SynapseError(403, spam_error, Codes.FORBIDDEN)
@@ -881,7 +879,9 @@ class EventCreationHandler(object):
"""
room_alias = RoomAlias.from_string(room_alias_str)
try:
- mapping = yield directory_handler.get_association(room_alias)
+ mapping = yield defer.ensureDeferred(
+ directory_handler.get_association(room_alias)
+ )
except SynapseError as e:
# Turn M_NOT_FOUND errors into M_BAD_ALIAS errors.
if e.errcode == Codes.NOT_FOUND:
@@ -988,7 +988,7 @@ class EventCreationHandler(object):
state_to_include_ids = [
e_id
- for k, e_id in iteritems(current_state_ids)
+ for k, e_id in current_state_ids.items()
if k[0] in self.room_invite_state_types
or k == (EventTypes.Member, event.sender)
]
@@ -1002,7 +1002,7 @@ class EventCreationHandler(object):
"content": e.content,
"sender": e.sender,
}
- for e in itervalues(state_to_include)
+ for e in state_to_include.values()
]
invitee = UserID.from_string(event.state_key)
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index d7442c62a7..da06582d4b 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -15,9 +15,6 @@
# limitations under the License.
import logging
-from six import iteritems
-
-from twisted.internet import defer
from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
@@ -99,8 +96,7 @@ class PaginationHandler(object):
job["longest_max_lifetime"],
)
- @defer.inlineCallbacks
- def purge_history_for_rooms_in_range(self, min_ms, max_ms):
+ async def purge_history_for_rooms_in_range(self, min_ms, max_ms):
"""Purge outdated events from rooms within the given retention range.
If a default retention policy is defined in the server's configuration and its
@@ -139,13 +135,13 @@ class PaginationHandler(object):
include_null,
)
- rooms = yield self.store.get_rooms_for_retention_period_in_range(
+ rooms = await self.store.get_rooms_for_retention_period_in_range(
min_ms, max_ms, include_null
)
logger.debug("[purge] Rooms to purge: %s", rooms)
- for room_id, retention_policy in iteritems(rooms):
+ for room_id, retention_policy in rooms.items():
logger.info("[purge] Attempting to purge messages in room %s", room_id)
if room_id in self._purges_in_progress_by_room:
@@ -167,9 +163,9 @@ class PaginationHandler(object):
# Figure out what token we should start purging at.
ts = self.clock.time_msec() - max_lifetime
- stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)
+ stream_ordering = await self.store.find_first_stream_ordering_after_ts(ts)
- r = yield self.store.get_room_event_before_stream_ordering(
+ r = await self.store.get_room_event_before_stream_ordering(
room_id, stream_ordering,
)
if not r:
@@ -229,8 +225,7 @@ class PaginationHandler(object):
)
return purge_id
- @defer.inlineCallbacks
- def _purge_history(self, purge_id, room_id, token, delete_local_events):
+ async def _purge_history(self, purge_id, room_id, token, delete_local_events):
"""Carry out a history purge on a room.
Args:
@@ -239,14 +234,11 @@ class PaginationHandler(object):
token (str): topological token to delete events before
delete_local_events (bool): True to delete local events as well as
remote ones
-
- Returns:
- Deferred
"""
self._purges_in_progress_by_room.add(room_id)
try:
- with (yield self.pagination_lock.write(room_id)):
- yield self.storage.purge_events.purge_history(
+ with await self.pagination_lock.write(room_id):
+ await self.storage.purge_events.purge_history(
room_id, token, delete_local_events
)
logger.info("[purge] complete")
@@ -284,9 +276,7 @@ class PaginationHandler(object):
await self.store.get_room_version_id(room_id)
# first check that we have no users in this room
- joined = await defer.maybeDeferred(
- self.store.is_host_joined, room_id, self._server_name
- )
+ joined = await self.store.is_host_joined(room_id, self._server_name)
if joined:
raise SynapseError(400, "Users are still joined to this room")
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 3594f3b00f..d2f25ae12a 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -25,9 +25,7 @@ The methods that define policy are:
import abc
import logging
from contextlib import contextmanager
-from typing import Dict, Iterable, List, Set
-
-from six import iteritems, itervalues
+from typing import Dict, Iterable, List, Set, Tuple
from prometheus_client import Counter
from typing_extensions import ContextManager
@@ -170,14 +168,14 @@ class BasePresenceHandler(abc.ABC):
for user_id in user_ids
}
- missing = [user_id for user_id, state in iteritems(states) if not state]
+ missing = [user_id for user_id, state in states.items() if not state]
if missing:
# There are things not in our in memory cache. Lets pull them out of
# the database.
res = await self.store.get_presence_for_users(missing)
states.update(res)
- missing = [user_id for user_id, state in iteritems(states) if not state]
+ missing = [user_id for user_id, state in states.items() if not state]
if missing:
new = {
user_id: UserPresenceState.default(user_id) for user_id in missing
@@ -632,7 +630,7 @@ class PresenceHandler(BasePresenceHandler):
await self._update_states(
[
prev_state.copy_and_replace(last_user_sync_ts=time_now_ms)
- for prev_state in itervalues(prev_states)
+ for prev_state in prev_states.values()
]
)
self.external_process_last_updated_ms.pop(process_id, None)
@@ -775,7 +773,9 @@ class PresenceHandler(BasePresenceHandler):
return False
- async def get_all_presence_updates(self, last_id, current_id, limit):
+ async def get_all_presence_updates(
+ self, instance_name: str, last_id: int, current_id: int, limit: int
+ ) -> Tuple[List[Tuple[int, list]], int, bool]:
"""
Gets a list of presence update rows from between the given stream ids.
Each row has:
@@ -787,10 +787,31 @@ class PresenceHandler(BasePresenceHandler):
- last_user_sync_ts(int)
- status_msg(int)
- currently_active(int)
+
+ Args:
+ instance_name: The writer we want to fetch updates from. Unused
+ here since there is only ever one writer.
+ last_id: The token to fetch updates from. Exclusive.
+ current_id: The token to fetch updates up to. Inclusive.
+ limit: The requested limit for the number of rows to return. The
+ function may return more or fewer rows.
+
+ Returns:
+ A tuple consisting of: the updates, a token to use to fetch
+ subsequent updates, and whether we returned fewer rows than exists
+ between the requested tokens due to the limit.
+
+ The token returned can be used in a subsequent call to this
+ function to get further updatees.
+
+ The updates are a list of 2-tuples of stream ID and the row data
"""
+
# TODO(markjh): replicate the unpersisted changes.
# This could use the in-memory stores for recent changes.
- rows = await self.store.get_all_presence_updates(last_id, current_id, limit)
+ rows = await self.store.get_all_presence_updates(
+ instance_name, last_id, current_id, limit
+ )
return rows
def notify_new_event(self):
@@ -1087,7 +1108,7 @@ class PresenceEventSource(object):
return (list(updates.values()), max_token)
else:
return (
- [s for s in itervalues(updates) if s.state != PresenceState.OFFLINE],
+ [s for s in updates.values() if s.state != PresenceState.OFFLINE],
max_token,
)
@@ -1323,11 +1344,11 @@ def get_interested_remotes(store, states, state_handler):
# hosts in those rooms.
room_ids_to_states, users_to_states = yield get_interested_parties(store, states)
- for room_id, states in iteritems(room_ids_to_states):
+ for room_id, states in room_ids_to_states.items():
hosts = yield state_handler.get_current_hosts_in_room(room_id)
hosts_and_states.append((hosts, states))
- for user_id, states in iteritems(users_to_states):
+ for user_id, states in users_to_states.items():
host = get_domain_from_id(user_id)
hosts_and_states.append(([host], states))
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 302efc1b9a..4b1e3073a8 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -15,8 +15,6 @@
import logging
-from six import raise_from
-
from twisted.internet import defer
from synapse.api.errors import (
@@ -84,7 +82,7 @@ class BaseProfileHandler(BaseHandler):
)
return result
except RequestSendFailed as e:
- raise_from(SynapseError(502, "Failed to fetch profile"), e)
+ raise SynapseError(502, "Failed to fetch profile") from e
except HttpResponseException as e:
raise e.to_synapse_error()
@@ -135,7 +133,7 @@ class BaseProfileHandler(BaseHandler):
ignore_backoff=True,
)
except RequestSendFailed as e:
- raise_from(SynapseError(502, "Failed to fetch profile"), e)
+ raise SynapseError(502, "Failed to fetch profile") from e
except HttpResponseException as e:
raise e.to_synapse_error()
@@ -212,7 +210,7 @@ class BaseProfileHandler(BaseHandler):
ignore_backoff=True,
)
except RequestSendFailed as e:
- raise_from(SynapseError(502, "Failed to fetch profile"), e)
+ raise SynapseError(502, "Failed to fetch profile") from e
except HttpResponseException as e:
raise e.to_synapse_error()
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 51979ea43e..78c3772ac1 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -17,7 +17,7 @@
import logging
from synapse import types
-from synapse.api.constants import MAX_USERID_LENGTH, LoginType
+from synapse.api.constants import MAX_USERID_LENGTH, EventTypes, JoinRules, LoginType
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
from synapse.config.server import is_threepid_reserved
from synapse.http.servlet import assert_params_in_dict
@@ -26,7 +26,8 @@ from synapse.replication.http.register import (
ReplicationPostRegisterActionsServlet,
ReplicationRegisterServlet,
)
-from synapse.types import RoomAlias, RoomID, UserID, create_requester
+from synapse.storage.state import StateFilter
+from synapse.types import RoomAlias, UserID, create_requester
from synapse.util.async_helpers import Linearizer
from ._base import BaseHandler
@@ -270,51 +271,157 @@ class RegistrationHandler(BaseHandler):
return user_id
- async def _auto_join_rooms(self, user_id):
- """Automatically joins users to auto join rooms - creating the room in the first place
- if the user is the first to be created.
+ async def _create_and_join_rooms(self, user_id: str):
+ """
+ Create the auto-join rooms and join or invite the user to them.
+
+ This should only be called when the first "real" user registers.
Args:
- user_id(str): The user to join
+ user_id: The user to join
"""
- # auto-join the user to any rooms we're supposed to dump them into
- fake_requester = create_requester(user_id)
+ # Getting the handlers during init gives a dependency loop.
+ room_creation_handler = self.hs.get_room_creation_handler()
+ room_member_handler = self.hs.get_room_member_handler()
- # try to create the room if we're the first real user on the server. Note
- # that an auto-generated support or bot user is not a real user and will never be
- # the user to create the room
- should_auto_create_rooms = False
- is_real_user = await self.store.is_real_user(user_id)
- if self.hs.config.autocreate_auto_join_rooms and is_real_user:
- count = await self.store.count_real_users()
- should_auto_create_rooms = count == 1
- for r in self.hs.config.auto_join_rooms:
+ # Generate a stub for how the rooms will be configured.
+ stub_config = {
+ "preset": self.hs.config.registration.autocreate_auto_join_room_preset,
+ }
+
+ # If the configuration providers a user ID to create rooms with, use
+ # that instead of the first user registered.
+ requires_join = False
+ if self.hs.config.registration.auto_join_user_id:
+ fake_requester = create_requester(
+ self.hs.config.registration.auto_join_user_id
+ )
+
+ # If the room requires an invite, add the user to the list of invites.
+ if self.hs.config.registration.auto_join_room_requires_invite:
+ stub_config["invite"] = [user_id]
+
+ # If the room is being created by a different user, the first user
+ # registered needs to join it. Note that in the case of an invitation
+ # being necessary this will occur after the invite was sent.
+ requires_join = True
+ else:
+ fake_requester = create_requester(user_id)
+
+ # Choose whether to federate the new room.
+ if not self.hs.config.registration.autocreate_auto_join_rooms_federated:
+ stub_config["creation_content"] = {"m.federate": False}
+
+ for r in self.hs.config.registration.auto_join_rooms:
logger.info("Auto-joining %s to %s", user_id, r)
+
try:
- if should_auto_create_rooms:
- room_alias = RoomAlias.from_string(r)
- if self.hs.hostname != room_alias.domain:
- logger.warning(
- "Cannot create room alias %s, "
- "it does not match server domain",
- r,
- )
- else:
- # create room expects the localpart of the room alias
- room_alias_localpart = room_alias.localpart
-
- # getting the RoomCreationHandler during init gives a dependency
- # loop
- await self.hs.get_room_creation_handler().create_room(
- fake_requester,
- config={
- "preset": "public_chat",
- "room_alias_name": room_alias_localpart,
- },
+ room_alias = RoomAlias.from_string(r)
+
+ if self.hs.hostname != room_alias.domain:
+ logger.warning(
+ "Cannot create room alias %s, "
+ "it does not match server domain",
+ r,
+ )
+ else:
+ # A shallow copy is OK here since the only key that is
+ # modified is room_alias_name.
+ config = stub_config.copy()
+ # create room expects the localpart of the room alias
+ config["room_alias_name"] = room_alias.localpart
+
+ info, _ = await room_creation_handler.create_room(
+ fake_requester, config=config, ratelimit=False,
+ )
+
+ # If the room does not require an invite, but another user
+ # created it, then ensure the first user joins it.
+ if requires_join:
+ await room_member_handler.update_membership(
+ requester=create_requester(user_id),
+ target=UserID.from_string(user_id),
+ room_id=info["room_id"],
+ # Since it was just created, there are no remote hosts.
+ remote_room_hosts=[],
+ action="join",
ratelimit=False,
)
+
+ except ConsentNotGivenError as e:
+ # Technically not necessary to pull out this error though
+ # moving away from bare excepts is a good thing to do.
+ logger.error("Failed to join new user to %r: %r", r, e)
+ except Exception as e:
+ logger.error("Failed to join new user to %r: %r", r, e)
+
+ async def _join_rooms(self, user_id: str):
+ """
+ Join or invite the user to the auto-join rooms.
+
+ Args:
+ user_id: The user to join
+ """
+ room_member_handler = self.hs.get_room_member_handler()
+
+ for r in self.hs.config.registration.auto_join_rooms:
+ logger.info("Auto-joining %s to %s", user_id, r)
+
+ try:
+ room_alias = RoomAlias.from_string(r)
+
+ if RoomAlias.is_valid(r):
+ (
+ room_id,
+ remote_room_hosts,
+ ) = await room_member_handler.lookup_room_alias(room_alias)
+ room_id = room_id.to_string()
else:
- await self._join_user_to_room(fake_requester, r)
+ raise SynapseError(
+ 400, "%s was not legal room ID or room alias" % (r,)
+ )
+
+ # Calculate whether the room requires an invite or can be
+ # joined directly. Note that unless a join rule of public exists,
+ # it is treated as requiring an invite.
+ requires_invite = True
+
+ state = await self.store.get_filtered_current_state_ids(
+ room_id, StateFilter.from_types([(EventTypes.JoinRules, "")])
+ )
+
+ event_id = state.get((EventTypes.JoinRules, ""))
+ if event_id:
+ join_rules_event = await self.store.get_event(
+ event_id, allow_none=True
+ )
+ if join_rules_event:
+ join_rule = join_rules_event.content.get("join_rule", None)
+ requires_invite = join_rule and join_rule != JoinRules.PUBLIC
+
+ # Send the invite, if necessary.
+ if requires_invite:
+ await room_member_handler.update_membership(
+ requester=create_requester(
+ self.hs.config.registration.auto_join_user_id
+ ),
+ target=UserID.from_string(user_id),
+ room_id=room_id,
+ remote_room_hosts=remote_room_hosts,
+ action="invite",
+ ratelimit=False,
+ )
+
+ # Send the join.
+ await room_member_handler.update_membership(
+ requester=create_requester(user_id),
+ target=UserID.from_string(user_id),
+ room_id=room_id,
+ remote_room_hosts=remote_room_hosts,
+ action="join",
+ ratelimit=False,
+ )
+
except ConsentNotGivenError as e:
# Technically not necessary to pull out this error though
# moving away from bare excepts is a good thing to do.
@@ -322,6 +429,29 @@ class RegistrationHandler(BaseHandler):
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e)
+ async def _auto_join_rooms(self, user_id: str):
+ """Automatically joins users to auto join rooms - creating the room in the first place
+ if the user is the first to be created.
+
+ Args:
+ user_id: The user to join
+ """
+ # auto-join the user to any rooms we're supposed to dump them into
+
+ # try to create the room if we're the first real user on the server. Note
+ # that an auto-generated support or bot user is not a real user and will never be
+ # the user to create the room
+ should_auto_create_rooms = False
+ is_real_user = await self.store.is_real_user(user_id)
+ if self.hs.config.registration.autocreate_auto_join_rooms and is_real_user:
+ count = await self.store.count_real_users()
+ should_auto_create_rooms = count == 1
+
+ if should_auto_create_rooms:
+ await self._create_and_join_rooms(user_id)
+ else:
+ await self._join_rooms(user_id)
+
async def post_consent_actions(self, user_id):
"""A series of registration actions that can only be carried out once consent
has been granted
@@ -392,30 +522,6 @@ class RegistrationHandler(BaseHandler):
self._next_generated_user_id += 1
return str(id)
- async def _join_user_to_room(self, requester, room_identifier):
- room_member_handler = self.hs.get_room_member_handler()
- if RoomID.is_valid(room_identifier):
- room_id = room_identifier
- elif RoomAlias.is_valid(room_identifier):
- room_alias = RoomAlias.from_string(room_identifier)
- room_id, remote_room_hosts = await room_member_handler.lookup_room_alias(
- room_alias
- )
- room_id = room_id.to_string()
- else:
- raise SynapseError(
- 400, "%s was not legal room ID or room alias" % (room_identifier,)
- )
-
- await room_member_handler.update_membership(
- requester=requester,
- target=requester.user,
- room_id=room_id,
- remote_room_hosts=remote_room_hosts,
- action="join",
- ratelimit=False,
- )
-
def check_registration_ratelimit(self, address):
"""A simple helper method to check whether the registration rate limit has been hit
for a given IP address
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 61db3ccc43..950a84acd0 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -24,9 +24,12 @@ import string
from collections import OrderedDict
from typing import Tuple
-from six import iteritems, string_types
-
-from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
+from synapse.api.constants import (
+ EventTypes,
+ JoinRules,
+ RoomCreationPreset,
+ RoomEncryptionAlgorithms,
+)
from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.events.utils import copy_power_levels_contents
@@ -56,31 +59,6 @@ FIVE_MINUTES_IN_MS = 5 * 60 * 1000
class RoomCreationHandler(BaseHandler):
-
- PRESETS_DICT = {
- RoomCreationPreset.PRIVATE_CHAT: {
- "join_rules": JoinRules.INVITE,
- "history_visibility": "shared",
- "original_invitees_have_ops": False,
- "guest_can_join": True,
- "power_level_content_override": {"invite": 0},
- },
- RoomCreationPreset.TRUSTED_PRIVATE_CHAT: {
- "join_rules": JoinRules.INVITE,
- "history_visibility": "shared",
- "original_invitees_have_ops": True,
- "guest_can_join": True,
- "power_level_content_override": {"invite": 0},
- },
- RoomCreationPreset.PUBLIC_CHAT: {
- "join_rules": JoinRules.PUBLIC,
- "history_visibility": "shared",
- "original_invitees_have_ops": False,
- "guest_can_join": False,
- "power_level_content_override": {},
- },
- }
-
def __init__(self, hs):
super(RoomCreationHandler, self).__init__(hs)
@@ -89,6 +67,39 @@ class RoomCreationHandler(BaseHandler):
self.room_member_handler = hs.get_room_member_handler()
self.config = hs.config
+ # Room state based off defined presets
+ self._presets_dict = {
+ RoomCreationPreset.PRIVATE_CHAT: {
+ "join_rules": JoinRules.INVITE,
+ "history_visibility": "shared",
+ "original_invitees_have_ops": False,
+ "guest_can_join": True,
+ "power_level_content_override": {"invite": 0},
+ },
+ RoomCreationPreset.TRUSTED_PRIVATE_CHAT: {
+ "join_rules": JoinRules.INVITE,
+ "history_visibility": "shared",
+ "original_invitees_have_ops": True,
+ "guest_can_join": True,
+ "power_level_content_override": {"invite": 0},
+ },
+ RoomCreationPreset.PUBLIC_CHAT: {
+ "join_rules": JoinRules.PUBLIC,
+ "history_visibility": "shared",
+ "original_invitees_have_ops": False,
+ "guest_can_join": False,
+ "power_level_content_override": {},
+ },
+ }
+
+ # Modify presets to selectively enable encryption by default per homeserver config
+ for preset_name, preset_config in self._presets_dict.items():
+ encrypted = (
+ preset_name
+ in self.config.encryption_enabled_by_default_for_room_presets
+ )
+ preset_config["encrypted"] = encrypted
+
self._replication = hs.get_replication_data_handler()
# linearizer to stop two upgrades happening at once
@@ -364,7 +375,7 @@ class RoomCreationHandler(BaseHandler):
# map from event_id to BaseEvent
old_room_state_events = await self.store.get_events(old_room_state_ids.values())
- for k, old_event_id in iteritems(old_room_state_ids):
+ for k, old_event_id in old_room_state_ids.items():
old_event = old_room_state_events.get(old_event_id)
if old_event:
initial_state[k] = old_event.content
@@ -417,7 +428,7 @@ class RoomCreationHandler(BaseHandler):
old_room_member_state_events = await self.store.get_events(
old_room_member_state_ids.values()
)
- for k, old_event in iteritems(old_room_member_state_events):
+ for k, old_event in old_room_member_state_events.items():
# Only transfer ban events
if (
"membership" in old_event.content
@@ -582,7 +593,7 @@ class RoomCreationHandler(BaseHandler):
"room_version", self.config.default_room_version.identifier
)
- if not isinstance(room_version_id, string_types):
+ if not isinstance(room_version_id, str):
raise SynapseError(400, "room_version must be a string", Codes.BAD_JSON)
room_version = KNOWN_ROOM_VERSIONS.get(room_version_id)
@@ -798,7 +809,7 @@ class RoomCreationHandler(BaseHandler):
)
return last_stream_id
- config = RoomCreationHandler.PRESETS_DICT[preset_config]
+ config = self._presets_dict[preset_config]
creator_id = creator.user.to_string()
@@ -888,6 +899,13 @@ class RoomCreationHandler(BaseHandler):
etype=etype, state_key=state_key, content=content
)
+ if config["encrypted"]:
+ last_sent_stream_id = await send(
+ etype=EventTypes.RoomEncryption,
+ state_key="",
+ content={"algorithm": RoomEncryptionAlgorithms.DEFAULT},
+ )
+
return last_sent_stream_id
async def _generate_room_id(
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 4cbc02b0d0..5e05be6181 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -17,8 +17,6 @@ import logging
from collections import namedtuple
from typing import Any, Dict, Optional
-from six import iteritems
-
import msgpack
from unpaddedbase64 import decode_base64, encode_base64
@@ -271,7 +269,7 @@ class RoomListHandler(BaseHandler):
event_map = yield self.store.get_events(
[
event_id
- for key, event_id in iteritems(current_state_ids)
+ for key, event_id in current_state_ids.items()
if key[0]
in (
EventTypes.Create,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 0f7af982f0..27c479da9e 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -17,10 +17,9 @@
import abc
import logging
+from http import HTTPStatus
from typing import Dict, Iterable, List, Optional, Tuple
-from six.moves import http_client
-
from synapse import types
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
@@ -361,7 +360,7 @@ class RoomMemberHandler(object):
if effective_membership_state == Membership.INVITE:
# block any attempts to invite the server notices mxid
if target.to_string() == self._server_notices_mxid:
- raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user")
+ raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user")
block_invite = False
@@ -444,7 +443,7 @@ class RoomMemberHandler(object):
is_blocked = await self._is_server_notice_room(room_id)
if is_blocked:
raise SynapseError(
- http_client.FORBIDDEN,
+ HTTPStatus.FORBIDDEN,
"You cannot reject this invite",
errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM,
)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 6bdb24baff..4c7524493e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -18,8 +18,6 @@ import itertools
import logging
from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple
-from six import iteritems, itervalues
-
import attr
from prometheus_client import Counter
@@ -390,7 +388,7 @@ class SyncHandler(object):
# result returned by the event source is poor form (it might cache
# the object)
room_id = event["room_id"]
- event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"}
+ event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
receipt_key = since_token.receipt_key if since_token else "0"
@@ -408,7 +406,7 @@ class SyncHandler(object):
for event in receipts:
room_id = event["room_id"]
# exclude room id, as above
- event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"}
+ event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
return now_token, ephemeral_by_room
@@ -454,7 +452,7 @@ class SyncHandler(object):
current_state_ids_map = await self.state.get_current_state_ids(
room_id
)
- current_state_ids = frozenset(itervalues(current_state_ids_map))
+ current_state_ids = frozenset(current_state_ids_map.values())
recents = await filter_events_for_client(
self.storage,
@@ -509,7 +507,7 @@ class SyncHandler(object):
current_state_ids_map = await self.state.get_current_state_ids(
room_id
)
- current_state_ids = frozenset(itervalues(current_state_ids_map))
+ current_state_ids = frozenset(current_state_ids_map.values())
loaded_recents = await filter_events_for_client(
self.storage,
@@ -909,7 +907,7 @@ class SyncHandler(object):
logger.debug("filtering state from %r...", state_ids)
state_ids = {
t: event_id
- for t, event_id in iteritems(state_ids)
+ for t, event_id in state_ids.items()
if cache.get(t[1]) != event_id
}
logger.debug("...to %r", state_ids)
@@ -1430,7 +1428,7 @@ class SyncHandler(object):
if since_token:
for joined_sync in sync_result_builder.joined:
it = itertools.chain(
- joined_sync.timeline.events, itervalues(joined_sync.state)
+ joined_sync.timeline.events, joined_sync.state.values()
)
for event in it:
if event.type == EventTypes.Member:
@@ -1505,7 +1503,7 @@ class SyncHandler(object):
newly_left_rooms = []
room_entries = []
invited = []
- for room_id, events in iteritems(mem_change_events_by_room_id):
+ for room_id, events in mem_change_events_by_room_id.items():
logger.debug(
"Membership changes in %s: [%s]",
room_id,
@@ -1993,17 +1991,17 @@ def _calculate_state(
event_id_to_key = {
e: key
for key, e in itertools.chain(
- iteritems(timeline_contains),
- iteritems(previous),
- iteritems(timeline_start),
- iteritems(current),
+ timeline_contains.items(),
+ previous.items(),
+ timeline_start.items(),
+ current.items(),
)
}
- c_ids = set(itervalues(current))
- ts_ids = set(itervalues(timeline_start))
- p_ids = set(itervalues(previous))
- tc_ids = set(itervalues(timeline_contains))
+ c_ids = set(current.values())
+ ts_ids = set(timeline_start.values())
+ p_ids = set(previous.values())
+ tc_ids = set(timeline_contains.values())
# If we are lazyloading room members, we explicitly add the membership events
# for the senders in the timeline into the state block returned by /sync,
@@ -2017,7 +2015,7 @@ def _calculate_state(
if lazy_load_members:
p_ids.difference_update(
- e for t, e in iteritems(timeline_start) if t[0] == EventTypes.Member
+ e for t, e in timeline_start.items() if t[0] == EventTypes.Member
)
state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c7bc14c623..6c7abaa578 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -15,9 +15,7 @@
import logging
from collections import namedtuple
-from typing import List
-
-from twisted.internet import defer
+from typing import List, Tuple
from synapse.api.errors import AuthError, SynapseError
from synapse.logging.context import run_in_background
@@ -115,8 +113,7 @@ class TypingHandler(object):
def is_typing(self, member):
return member.user_id in self._room_typing.get(member.room_id, [])
- @defer.inlineCallbacks
- def started_typing(self, target_user, auth_user, room_id, timeout):
+ async def started_typing(self, target_user, auth_user, room_id, timeout):
target_user_id = target_user.to_string()
auth_user_id = auth_user.to_string()
@@ -126,7 +123,7 @@ class TypingHandler(object):
if target_user_id != auth_user_id:
raise AuthError(400, "Cannot set another user's typing state")
- yield self.auth.check_user_in_room(room_id, target_user_id)
+ await self.auth.check_user_in_room(room_id, target_user_id)
logger.debug("%s has started typing in %s", target_user_id, room_id)
@@ -145,8 +142,7 @@ class TypingHandler(object):
self._push_update(member=member, typing=True)
- @defer.inlineCallbacks
- def stopped_typing(self, target_user, auth_user, room_id):
+ async def stopped_typing(self, target_user, auth_user, room_id):
target_user_id = target_user.to_string()
auth_user_id = auth_user.to_string()
@@ -156,7 +152,7 @@ class TypingHandler(object):
if target_user_id != auth_user_id:
raise AuthError(400, "Cannot set another user's typing state")
- yield self.auth.check_user_in_room(room_id, target_user_id)
+ await self.auth.check_user_in_room(room_id, target_user_id)
logger.debug("%s has stopped typing in %s", target_user_id, room_id)
@@ -164,12 +160,11 @@ class TypingHandler(object):
self._stopped_typing(member)
- @defer.inlineCallbacks
def user_left_room(self, user, room_id):
user_id = user.to_string()
if self.is_mine_id(user_id):
member = RoomMember(room_id=room_id, user_id=user_id)
- yield self._stopped_typing(member)
+ self._stopped_typing(member)
def _stopped_typing(self, member):
if member.user_id not in self._room_typing.get(member.room_id, set()):
@@ -188,10 +183,9 @@ class TypingHandler(object):
self._push_update_local(member=member, typing=typing)
- @defer.inlineCallbacks
- def _push_remote(self, member, typing):
+ async def _push_remote(self, member, typing):
try:
- users = yield self.state.get_current_users_in_room(member.room_id)
+ users = await self.state.get_current_users_in_room(member.room_id)
self._member_last_federation_poke[member] = self.clock.time_msec()
now = self.clock.time_msec()
@@ -215,8 +209,7 @@ class TypingHandler(object):
except Exception:
logger.exception("Error pushing typing notif to remotes")
- @defer.inlineCallbacks
- def _recv_edu(self, origin, content):
+ async def _recv_edu(self, origin, content):
room_id = content["room_id"]
user_id = content["user_id"]
@@ -231,7 +224,7 @@ class TypingHandler(object):
)
return
- users = yield self.state.get_current_users_in_room(room_id)
+ users = await self.state.get_current_users_in_room(room_id)
domains = {get_domain_from_id(u) for u in users}
if self.server_name in domains:
@@ -259,14 +252,31 @@ class TypingHandler(object):
)
async def get_all_typing_updates(
- self, last_id: int, current_id: int, limit: int
- ) -> List[dict]:
- """Get up to `limit` typing updates between the given tokens, earliest
- updates first.
+ self, instance_name: str, last_id: int, current_id: int, limit: int
+ ) -> Tuple[List[Tuple[int, list]], int, bool]:
+ """Get updates for typing replication stream.
+
+ Args:
+ instance_name: The writer we want to fetch updates from. Unused
+ here since there is only ever one writer.
+ last_id: The token to fetch updates from. Exclusive.
+ current_id: The token to fetch updates up to. Inclusive.
+ limit: The requested limit for the number of rows to return. The
+ function may return more or fewer rows.
+
+ Returns:
+ A tuple consisting of: the updates, a token to use to fetch
+ subsequent updates, and whether we returned fewer rows than exists
+ between the requested tokens due to the limit.
+
+ The token returned can be used in a subsequent call to this
+ function to get further updatees.
+
+ The updates are a list of 2-tuples of stream ID and the row data
"""
if last_id == current_id:
- return []
+ return [], current_id, False
changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
last_id
@@ -280,9 +290,16 @@ class TypingHandler(object):
serial = self._room_serials[room_id]
if last_id < serial <= current_id:
typing = self._room_typing[room_id]
- rows.append((serial, room_id, list(typing)))
+ rows.append((serial, [room_id, list(typing)]))
rows.sort()
- return rows[:limit]
+
+ limited = False
+ if len(rows) > limit:
+ rows = rows[:limit]
+ current_id = rows[-1][0]
+ limited = True
+
+ return rows, current_id, limited
def get_current_token(self):
return self._latest_room_serial
@@ -306,7 +323,7 @@ class TypingNotificationEventSource(object):
"content": {"user_ids": list(typing)},
}
- def get_new_events(self, from_key, room_ids, **kwargs):
+ async def get_new_events(self, from_key, room_ids, **kwargs):
with Measure(self.clock, "typing.get_new_events"):
from_key = int(from_key)
handler = self.get_typing_handler()
@@ -320,7 +337,7 @@ class TypingNotificationEventSource(object):
events.append(self._make_event_for(room_id))
- return defer.succeed((events, handler._latest_room_serial))
+ return (events, handler._latest_room_serial)
def get_current_key(self):
return self.get_typing_handler()._latest_room_serial
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 12423b909a..521b6d620d 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -15,8 +15,6 @@
import logging
-from six import iteritems, iterkeys
-
import synapse.metrics
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.handlers.state_deltas import StateDeltasHandler
@@ -289,7 +287,7 @@ class UserDirectoryHandler(StateDeltasHandler):
users_with_profile = await self.state.get_current_users_in_room(room_id)
# Remove every user from the sharing tables for that room.
- for user_id in iterkeys(users_with_profile):
+ for user_id in users_with_profile.keys():
await self.store.remove_user_who_share_room(user_id, room_id)
# Then, re-add them to the tables.
@@ -298,7 +296,7 @@ class UserDirectoryHandler(StateDeltasHandler):
# which when ran over an entire room, will result in the same values
# being added multiple times. The batching upserts shouldn't make this
# too bad, though.
- for user_id, profile in iteritems(users_with_profile):
+ for user_id, profile in users_with_profile.items():
await self._handle_new_user(room_id, user_id, profile)
async def _handle_new_user(self, room_id, user_id, profile):
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 3cef747a4d..8743e9839d 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -15,11 +15,9 @@
# limitations under the License.
import logging
+import urllib
from io import BytesIO
-from six import raise_from, text_type
-from six.moves import urllib
-
import treq
from canonicaljson import encode_canonical_json, json
from netaddr import IPAddress
@@ -577,7 +575,7 @@ class SimpleHttpClient(object):
# This can happen e.g. because the body is too large.
raise
except Exception as e:
- raise_from(SynapseError(502, ("Failed to download remote body: %s" % e)), e)
+ raise SynapseError(502, ("Failed to download remote body: %s" % e)) from e
return (
length,
@@ -638,7 +636,7 @@ def encode_urlencode_args(args):
def encode_urlencode_arg(arg):
- if isinstance(arg, text_type):
+ if isinstance(arg, str):
return arg.encode("utf-8")
elif isinstance(arg, list):
return [encode_urlencode_arg(i) for i in arg]
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index f5f917f5ae..c5fc746f2f 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -48,6 +48,9 @@ class MatrixFederationAgent(object):
tls_client_options_factory (FederationPolicyForHTTPS|None):
factory to use for fetching client tls options, or none to disable TLS.
+ user_agent (bytes):
+ The user agent header to use for federation requests.
+
_srv_resolver (SrvResolver|None):
SRVResolver impl to use for looking up SRV records. None to use a default
implementation.
@@ -61,6 +64,7 @@ class MatrixFederationAgent(object):
self,
reactor,
tls_client_options_factory,
+ user_agent,
_srv_resolver=None,
_well_known_resolver=None,
):
@@ -78,6 +82,7 @@ class MatrixFederationAgent(object):
),
pool=self._pool,
)
+ self.user_agent = user_agent
if _well_known_resolver is None:
_well_known_resolver = WellKnownResolver(
@@ -87,6 +92,7 @@ class MatrixFederationAgent(object):
pool=self._pool,
contextFactory=tls_client_options_factory,
),
+ user_agent=self.user_agent,
)
self._well_known_resolver = _well_known_resolver
@@ -149,7 +155,7 @@ class MatrixFederationAgent(object):
parsed_uri = urllib.parse.urlparse(uri)
# We need to make sure the host header is set to the netloc of the
- # server.
+ # server and that a user-agent is provided.
if headers is None:
headers = Headers()
else:
@@ -157,6 +163,8 @@ class MatrixFederationAgent(object):
if not headers.hasHeader(b"host"):
headers.addRawHeader(b"host", parsed_uri.netloc)
+ if not headers.hasHeader(b"user-agent"):
+ headers.addRawHeader(b"user-agent", self.user_agent)
res = yield make_deferred_yieldable(
self._agent.request(method, uri, headers, bodyProducer)
diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py
index 7ddfad286d..89a3b041ce 100644
--- a/synapse/http/federation/well_known_resolver.py
+++ b/synapse/http/federation/well_known_resolver.py
@@ -23,6 +23,7 @@ import attr
from twisted.internet import defer
from twisted.web.client import RedirectAgent, readBody
from twisted.web.http import stringToDatetime
+from twisted.web.http_headers import Headers
from synapse.logging.context import make_deferred_yieldable
from synapse.util import Clock
@@ -78,7 +79,12 @@ class WellKnownResolver(object):
"""
def __init__(
- self, reactor, agent, well_known_cache=None, had_well_known_cache=None
+ self,
+ reactor,
+ agent,
+ user_agent,
+ well_known_cache=None,
+ had_well_known_cache=None,
):
self._reactor = reactor
self._clock = Clock(reactor)
@@ -92,6 +98,7 @@ class WellKnownResolver(object):
self._well_known_cache = well_known_cache
self._had_valid_well_known_cache = had_well_known_cache
self._well_known_agent = RedirectAgent(agent)
+ self.user_agent = user_agent
@defer.inlineCallbacks
def get_well_known(self, server_name):
@@ -227,6 +234,10 @@ class WellKnownResolver(object):
uri = b"https://%s/.well-known/matrix/server" % (server_name,)
uri_str = uri.decode("ascii")
+ headers = {
+ b"User-Agent": [self.user_agent],
+ }
+
i = 0
while True:
i += 1
@@ -234,7 +245,9 @@ class WellKnownResolver(object):
logger.info("Fetching %s", uri_str)
try:
response = yield make_deferred_yieldable(
- self._well_known_agent.request(b"GET", uri)
+ self._well_known_agent.request(
+ b"GET", uri, headers=Headers(headers)
+ )
)
body = yield make_deferred_yieldable(readBody(response))
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 2d47b9ea00..18f6a8fd29 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -17,11 +17,9 @@ import cgi
import logging
import random
import sys
+import urllib
from io import BytesIO
-from six import raise_from, string_types
-from six.moves import urllib
-
import attr
import treq
from canonicaljson import encode_canonical_json
@@ -199,7 +197,14 @@ class MatrixFederationHttpClient(object):
self.reactor = Reactor()
- self.agent = MatrixFederationAgent(self.reactor, tls_client_options_factory)
+ user_agent = hs.version_string
+ if hs.config.user_agent_suffix:
+ user_agent = "%s %s" % (user_agent, hs.config.user_agent_suffix)
+ user_agent = user_agent.encode("ascii")
+
+ self.agent = MatrixFederationAgent(
+ self.reactor, tls_client_options_factory, user_agent
+ )
# Use a BlacklistingAgentWrapper to prevent circumventing the IP
# blacklist via IP literals in server names
@@ -432,10 +437,10 @@ class MatrixFederationHttpClient(object):
except TimeoutError as e:
raise RequestSendFailed(e, can_retry=True) from e
except DNSLookupError as e:
- raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
+ raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e
except Exception as e:
logger.info("Failed to send request: %s", e)
- raise_from(RequestSendFailed(e, can_retry=True), e)
+ raise RequestSendFailed(e, can_retry=True) from e
incoming_responses_counter.labels(
request.method, response.code
@@ -487,7 +492,7 @@ class MatrixFederationHttpClient(object):
# Retry if the error is a 429 (Too Many Requests),
# otherwise just raise a standard HttpResponseException
if response.code == 429:
- raise_from(RequestSendFailed(e, can_retry=True), e)
+ raise RequestSendFailed(e, can_retry=True) from e
else:
raise e
@@ -998,7 +1003,7 @@ def encode_query_args(args):
encoded_args = {}
for k, vs in args.items():
- if isinstance(vs, string_types):
+ if isinstance(vs, str):
vs = [vs]
encoded_args[k] = [v.encode("UTF-8") for v in vs]
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 2331a2a4b0..d192de7923 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -16,10 +16,10 @@
import collections
import html
-import http.client
import logging
import types
import urllib
+from http import HTTPStatus
from io import BytesIO
from typing import Awaitable, Callable, TypeVar, Union
@@ -188,7 +188,7 @@ def return_html_error(
exc_info=(f.type, f.value, f.getTracebackObject()),
)
else:
- code = http.HTTPStatus.INTERNAL_SERVER_ERROR
+ code = HTTPStatus.INTERNAL_SERVER_ERROR
msg = "Internal server error"
logger.error(
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 167293c46d..cbc37eac6e 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -19,6 +19,7 @@ from typing import Optional
from twisted.python.failure import Failure
from twisted.web.server import Request, Site
+from synapse.config.server import ListenerConfig
from synapse.http import redact_uri
from synapse.http.request_metrics import RequestMetrics, requests_counter
from synapse.logging.context import LoggingContext, PreserveLoggingContext
@@ -350,7 +351,7 @@ class SynapseSite(Site):
self,
logger_name,
site_tag,
- config,
+ config: ListenerConfig,
resource,
server_version_string,
*args,
@@ -360,7 +361,8 @@ class SynapseSite(Site):
self.site_tag = site_tag
- proxied = config.get("x_forwarded", False)
+ assert config.http_options is not None
+ proxied = config.http_options.x_forwarded
self.requestFactory = XForwardedForRequest if proxied else SynapseRequest
self.access_logger = logging.getLogger(logger_name)
self.server_version_string = server_version_string.encode("ascii")
diff --git a/synapse/logging/formatter.py b/synapse/logging/formatter.py
index fbf570c756..d736ad5b9b 100644
--- a/synapse/logging/formatter.py
+++ b/synapse/logging/formatter.py
@@ -16,8 +16,7 @@
import logging
import traceback
-
-from six import StringIO
+from io import StringIO
class LogFormatter(logging.Formatter):
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index 5dddf57008..73bef5e5ca 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -171,8 +171,9 @@ import logging
import re
import types
from functools import wraps
-from typing import TYPE_CHECKING, Dict
+from typing import TYPE_CHECKING, Dict, Optional, Type
+import attr
from canonicaljson import json
from twisted.internet import defer
@@ -232,6 +233,30 @@ except ImportError:
LogContextScopeManager = None # type: ignore
+try:
+ from rust_python_jaeger_reporter import Reporter
+
+ @attr.s(slots=True, frozen=True)
+ class _WrappedRustReporter:
+ """Wrap the reporter to ensure `report_span` never throws.
+ """
+
+ _reporter = attr.ib(type=Reporter, default=attr.Factory(Reporter))
+
+ def set_process(self, *args, **kwargs):
+ return self._reporter.set_process(*args, **kwargs)
+
+ def report_span(self, span):
+ try:
+ return self._reporter.report_span(span)
+ except Exception:
+ logger.exception("Failed to report span")
+
+ RustReporter = _WrappedRustReporter # type: Optional[Type[_WrappedRustReporter]]
+except ImportError:
+ RustReporter = None
+
+
logger = logging.getLogger(__name__)
@@ -320,11 +345,19 @@ def init_tracer(hs: "HomeServer"):
set_homeserver_whitelist(hs.config.opentracer_whitelist)
- JaegerConfig(
+ config = JaegerConfig(
config=hs.config.jaeger_config,
service_name="{} {}".format(hs.config.server_name, hs.get_instance_name()),
scope_manager=LogContextScopeManager(hs.config),
- ).initialize_tracer()
+ )
+
+ # If we have the rust jaeger reporter available let's use that.
+ if RustReporter:
+ logger.info("Using rust_python_jaeger_reporter library")
+ tracer = config.create_tracer(RustReporter(), config.sampler)
+ opentracing.set_global_tracer(tracer)
+ else:
+ config.initialize_tracer()
# Whitelisting
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 9cf31f96b3..6035672698 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -22,8 +22,6 @@ import threading
import time
from typing import Callable, Dict, Iterable, Optional, Tuple, Union
-import six
-
import attr
from prometheus_client import Counter, Gauge, Histogram
from prometheus_client.core import (
@@ -83,7 +81,7 @@ class LaterGauge(object):
return
if isinstance(calls, dict):
- for k, v in six.iteritems(calls):
+ for k, v in calls.items():
g.add_metric(k, v)
else:
g.add_metric([], calls)
@@ -194,7 +192,7 @@ class InFlightGauge(object):
gauge = GaugeMetricFamily(
"_".join([self.name, name]), "", labels=self.labels
)
- for key, metrics in six.iteritems(metrics_by_key):
+ for key, metrics in metrics_by_key.items():
gauge.add_metric(key, getattr(metrics, name))
yield gauge
@@ -465,6 +463,12 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"
# finished being processed.
event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
+event_processing_lag_by_event = Histogram(
+ "synapse_event_processing_lag_by_event",
+ "Time between an event being persisted and it being queued up to be sent to the relevant remote servers",
+ ["name"],
+)
+
# Build info of the running server.
build_info = Gauge(
"synapse_build_info", "Build information", ["pythonversion", "version", "osversion"]
diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py
index ab7f948ed4..4304c60d56 100644
--- a/synapse/metrics/_exposition.py
+++ b/synapse/metrics/_exposition.py
@@ -208,6 +208,7 @@ class MetricsHandler(BaseHTTPRequestHandler):
raise
self.send_response(200)
self.send_header("Content-Type", CONTENT_TYPE_LATEST)
+ self.send_header("Content-Length", str(len(output)))
self.end_headers()
self.wfile.write(output)
@@ -261,4 +262,6 @@ class MetricsResource(Resource):
def render_GET(self, request):
request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode("ascii"))
- return generate_latest(self.registry)
+ response = generate_latest(self.registry)
+ request.setHeader(b"Content-Length", str(len(response)))
+ return response
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index e75d964ac8..43ffe6faf0 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -17,8 +17,6 @@
import logging
from collections import namedtuple
-from six import iteritems, itervalues
-
from prometheus_client import Counter
from twisted.internet import defer
@@ -130,7 +128,7 @@ class BulkPushRuleEvaluator(object):
event, prev_state_ids, for_verification=False
)
auth_events = yield self.store.get_events(auth_events_ids)
- auth_events = {(e.type, e.state_key): e for e in itervalues(auth_events)}
+ auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
sender_level = get_user_power_level(event.sender, auth_events)
@@ -162,7 +160,7 @@ class BulkPushRuleEvaluator(object):
condition_cache = {}
- for uid, rules in iteritems(rules_by_user):
+ for uid, rules in rules_by_user.items():
if event.sender == uid:
continue
@@ -395,7 +393,7 @@ class RulesForRoom(object):
# If the event is a join event then it will be in current state evnts
# map but not in the DB, so we have to explicitly insert it.
if event.type == EventTypes.Member:
- for event_id in itervalues(member_event_ids):
+ for event_id in member_event_ids.values():
if event_id == event.event_id:
members[event_id] = (event.state_key, event.membership)
@@ -404,7 +402,7 @@ class RulesForRoom(object):
interested_in_user_ids = {
user_id
- for user_id, membership in itervalues(members)
+ for user_id, membership in members.values()
if membership == Membership.JOIN
}
@@ -415,7 +413,7 @@ class RulesForRoom(object):
)
user_ids = {
- uid for uid, have_pusher in iteritems(if_users_with_pushers) if have_pusher
+ uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
}
logger.debug("With pushers: %r", user_ids)
@@ -436,7 +434,7 @@ class RulesForRoom(object):
)
ret_rules_by_user.update(
- item for item in iteritems(rules_by_user) if item[0] is not None
+ item for item in rules_by_user.items() if item[0] is not None
)
self.update_cache(sequence, members, ret_rules_by_user, state_group)
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index eaaa7afc91..ed60dbc1bf 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -129,6 +129,8 @@ class HttpPusher(object):
@defer.inlineCallbacks
def _update_badge(self):
+ # XXX as per https://github.com/matrix-org/matrix-doc/issues/2627, this seems
+ # to be largely redundant. perhaps we can remove it.
badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
yield self._send_badge(badge)
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index d57a66a697..dda560b2c2 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -17,12 +17,11 @@ import email.mime.multipart
import email.utils
import logging
import time
+import urllib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import Iterable, List, TypeVar
-from six.moves import urllib
-
import bleach
import jinja2
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 11032491af..8e0d3a416d 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -18,8 +18,6 @@ import logging
import re
from typing import Pattern
-from six import string_types
-
from synapse.events import EventBase
from synapse.types import UserID
from synapse.util.caches import register_cache
@@ -131,7 +129,7 @@ class PushRuleEvaluatorForEvent(object):
# XXX: optimisation: cache our pattern regexps
if condition["key"] == "content.body":
body = self._event.content.get("body", None)
- if not body:
+ if not body or not isinstance(body, str):
return False
return _glob_matches(pattern, body, word_boundary=True)
@@ -147,7 +145,7 @@ class PushRuleEvaluatorForEvent(object):
return False
body = self._event.content.get("body", None)
- if not body:
+ if not body or not isinstance(body, str):
return False
# Similar to _glob_matches, but do not treat display_name as a glob.
@@ -244,7 +242,7 @@ def _flatten_dict(d, prefix=[], result=None):
if result is None:
result = {}
for key, value in d.items():
- if isinstance(value, string_types):
+ if isinstance(value, str):
result[".".join(prefix + [key])] = value.lower()
elif hasattr(value, "items"):
_flatten_dict(value, prefix=(prefix + [key]), result=result)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 88d203aa44..f6a5458681 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -215,11 +215,9 @@ class PusherPool:
try:
# Need to subtract 1 from the minimum because the lower bound here
# is not inclusive
- updated_receipts = yield self.store.get_all_updated_receipts(
+ users_affected = yield self.store.get_users_sent_receipts_between(
min_stream_id - 1, max_stream_id
)
- # This returns a tuple, user_id is at index 3
- users_affected = {r[3] for r in updated_receipts}
for u in users_affected:
if u in self.pushers:
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 8b4312e5a3..b1cac901eb 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -66,11 +66,9 @@ REQUIREMENTS = [
"pymacaroons>=0.13.0",
"msgpack>=0.5.2",
"phonenumbers>=8.2.0",
- "six>=1.10",
"prometheus_client>=0.0.18,<0.8.0",
- # we use attr.s(slots), which arrived in 16.0.0
- # Twisted 18.7.0 requires attrs>=17.4.0
- "attrs>=17.4.0",
+ # we use attr.validators.deep_iterable, which arrived in 19.1.0
+ "attrs>=19.1.0",
"netaddr>=0.7.18",
"Jinja2>=2.9",
"bleach>=1.4.3",
@@ -95,7 +93,12 @@ CONDITIONAL_REQUIREMENTS = {
"oidc": ["authlib>=0.14.0"],
"systemd": ["systemd-python>=231"],
"url_preview": ["lxml>=3.5.0"],
- "test": ["mock>=2.0", "parameterized"],
+ # Dependencies which are exclusively required by unit test code. This is
+ # NOT a list of all modules that are necessary to run the unit tests.
+ # Tests assume that all optional dependencies are installed.
+ #
+ # parameterized_class decorator was introduced in parameterized 0.7.0
+ "test": ["mock>=2.0", "parameterized>=0.7.0"],
"sentry": ["sentry-sdk>=0.7.2"],
"opentracing": ["jaeger-client>=4.0.0", "opentracing>=2.2.0"],
"jwt": ["pyjwt>=1.6.4"],
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 793cef6c26..9caf1e80c1 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -16,12 +16,10 @@
import abc
import logging
import re
+import urllib
from inspect import signature
from typing import Dict, List, Tuple
-from six import raise_from
-from six.moves import urllib
-
from twisted.internet import defer
from synapse.api.errors import (
@@ -220,7 +218,7 @@ class ReplicationEndpoint(object):
# importantly, not stack traces everywhere)
raise e.to_synapse_error()
except RequestSendFailed as e:
- raise_from(SynapseError(502, "Failed to talk to master"), e)
+ raise SynapseError(502, "Failed to talk to master") from e
return result
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index c04f622816..ea5937a20c 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -149,7 +149,7 @@ class RdataCommand(Command):
class PositionCommand(Command):
- """Sent by the server to tell the client the stream postition without
+ """Sent by the server to tell the client the stream position without
needing to send an RDATA.
Format::
@@ -188,7 +188,7 @@ class ErrorCommand(_SimpleCommand):
class PingCommand(_SimpleCommand):
- """Sent by either side as a keep alive. The data is arbitary (often timestamp)
+ """Sent by either side as a keep alive. The data is arbitrary (often timestamp)
"""
NAME = "PING"
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index cbcf46f3ae..e6a2e2598b 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -112,8 +112,8 @@ class ReplicationCommandHandler:
"replication_position", clock=self._clock
)
- # Map of stream to batched updates. See RdataCommand for info on how
- # batching works.
+ # Map of stream name to batched updates. See RdataCommand for info on
+ # how batching works.
self._pending_batches = {} # type: Dict[str, List[Any]]
# The factory used to create connections.
@@ -123,7 +123,8 @@ class ReplicationCommandHandler:
# outgoing replication commands to.)
self._connections = [] # type: List[AbstractConnection]
- # For each connection, the incoming streams that are coming from that connection
+ # For each connection, the incoming stream names that are coming from
+ # that connection.
self._streams_by_connection = {} # type: Dict[AbstractConnection, Set[str]]
LaterGauge(
@@ -310,7 +311,28 @@ class ReplicationCommandHandler:
# Check if this is the last of a batch of updates
rows = self._pending_batches.pop(stream_name, [])
rows.append(row)
- await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows)
+
+ stream = self._streams.get(stream_name)
+ if not stream:
+ logger.error("Got RDATA for unknown stream: %s", stream_name)
+ return
+
+ # Find where we previously streamed up to.
+ current_token = stream.current_token(cmd.instance_name)
+
+ # Discard this data if this token is earlier than the current
+ # position. Note that streams can be reset (in which case you
+ # expect an earlier token), but that must be preceded by a
+ # POSITION command.
+ if cmd.token <= current_token:
+ logger.debug(
+ "Discarding RDATA from stream %s at position %s before previous position %s",
+ stream_name,
+ cmd.token,
+ current_token,
+ )
+ else:
+ await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows)
async def on_rdata(
self, stream_name: str, instance_name: str, token: int, rows: list
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 4acefc8a96..f196eff072 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -264,7 +264,7 @@ class BackfillStream(Stream):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_current_backfill_token),
- db_query_to_update_function(store.get_all_new_backfill_event_rows),
+ store.get_all_new_backfill_event_rows,
)
@@ -291,9 +291,7 @@ class PresenceStream(Stream):
if hs.config.worker_app is None:
# on the master, query the presence handler
presence_handler = hs.get_presence_handler()
- update_function = db_query_to_update_function(
- presence_handler.get_all_presence_updates
- )
+ update_function = presence_handler.get_all_presence_updates
else:
# Query master process
update_function = make_http_update_function(hs, self.NAME)
@@ -318,9 +316,7 @@ class TypingStream(Stream):
if hs.config.worker_app is None:
# on the master, query the typing handler
- update_function = db_query_to_update_function(
- typing_handler.get_all_typing_updates
- )
+ update_function = typing_handler.get_all_typing_updates
else:
# Query master process
update_function = make_http_update_function(hs, self.NAME)
@@ -352,7 +348,7 @@ class ReceiptsStream(Stream):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_max_receipt_stream_id),
- db_query_to_update_function(store.get_all_updated_receipts),
+ store.get_all_updated_receipts,
)
@@ -367,26 +363,17 @@ class PushRulesStream(Stream):
def __init__(self, hs):
self.store = hs.get_datastore()
+
super(PushRulesStream, self).__init__(
- hs.get_instance_name(), self._current_token, self._update_function
+ hs.get_instance_name(),
+ self._current_token,
+ self.store.get_all_push_rule_updates,
)
def _current_token(self, instance_name: str) -> int:
push_rules_token, _ = self.store.get_push_rules_stream_token()
return push_rules_token
- async def _update_function(
- self, instance_name: str, from_token: Token, to_token: Token, limit: int
- ):
- rows = await self.store.get_all_push_rule_updates(from_token, to_token, limit)
-
- limited = False
- if len(rows) == limit:
- to_token = rows[-1][0]
- limited = True
-
- return [(row[0], (row[2],)) for row in rows], to_token, limited
-
class PushersStream(Stream):
"""A user has added/changed/removed a pusher
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 8173baef8f..e07c32118d 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -15,7 +15,7 @@
import logging
from typing import List, Optional
-from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.api.constants import EventTypes, JoinRules, Membership, RoomCreationPreset
from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.http.servlet import (
RestServlet,
@@ -77,7 +77,7 @@ class ShutdownRoomRestServlet(RestServlet):
info, stream_id = await self._room_creation_handler.create_room(
room_creator_requester,
config={
- "preset": "public_chat",
+ "preset": RoomCreationPreset.PUBLIC_CHAT,
"name": room_name,
"power_level_content_override": {"users_default": -10},
},
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index fefc8f71fa..e4330c39d6 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -16,9 +16,7 @@ import hashlib
import hmac
import logging
import re
-
-from six import text_type
-from six.moves import http_client
+from http import HTTPStatus
from synapse.api.constants import UserTypes
from synapse.api.errors import Codes, NotFoundError, SynapseError
@@ -215,10 +213,7 @@ class UserRestServletV2(RestServlet):
await self.store.set_server_admin(target_user, set_admin_to)
if "password" in body:
- if (
- not isinstance(body["password"], text_type)
- or len(body["password"]) > 512
- ):
+ if not isinstance(body["password"], str) or len(body["password"]) > 512:
raise SynapseError(400, "Invalid password")
else:
new_password = body["password"]
@@ -252,7 +247,7 @@ class UserRestServletV2(RestServlet):
password = body.get("password")
password_hash = None
if password is not None:
- if not isinstance(password, text_type) or len(password) > 512:
+ if not isinstance(password, str) or len(password) > 512:
raise SynapseError(400, "Invalid password")
password_hash = await self.auth_handler.hash(password)
@@ -370,10 +365,7 @@ class UserRegisterServlet(RestServlet):
400, "username must be specified", errcode=Codes.BAD_JSON
)
else:
- if (
- not isinstance(body["username"], text_type)
- or len(body["username"]) > 512
- ):
+ if not isinstance(body["username"], str) or len(body["username"]) > 512:
raise SynapseError(400, "Invalid username")
username = body["username"].encode("utf-8")
@@ -386,7 +378,7 @@ class UserRegisterServlet(RestServlet):
)
else:
password = body["password"]
- if not isinstance(password, text_type) or len(password) > 512:
+ if not isinstance(password, str) or len(password) > 512:
raise SynapseError(400, "Invalid password")
password_bytes = password.encode("utf-8")
@@ -477,7 +469,7 @@ class DeactivateAccountRestServlet(RestServlet):
erase = body.get("erase", False)
if not isinstance(erase, bool):
raise SynapseError(
- http_client.BAD_REQUEST,
+ HTTPStatus.BAD_REQUEST,
"Param 'erase' must be a boolean, if given",
Codes.BAD_JSON,
)
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index dceb2792fa..bf0f9bd077 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -60,10 +60,18 @@ def login_id_thirdparty_from_phone(identifier):
Returns: Login identifier dict of type 'm.id.threepid'
"""
- if "country" not in identifier or "number" not in identifier:
+ if "country" not in identifier or (
+ # The specification requires a "phone" field, while Synapse used to require a "number"
+ # field. Accept both for backwards compatibility.
+ "phone" not in identifier
+ and "number" not in identifier
+ ):
raise SynapseError(400, "Invalid phone-type identifier")
- msisdn = phone_number_to_msisdn(identifier["country"], identifier["number"])
+ # Accept both "phone" and "number" as valid keys in m.id.phone
+ phone_number = identifier.get("phone", identifier["number"])
+
+ msisdn = phone_number_to_msisdn(identifier["country"], phone_number)
return {"type": "m.id.thirdparty", "medium": "msisdn", "address": msisdn}
@@ -73,7 +81,8 @@ class LoginRestServlet(RestServlet):
CAS_TYPE = "m.login.cas"
SSO_TYPE = "m.login.sso"
TOKEN_TYPE = "m.login.token"
- JWT_TYPE = "m.login.jwt"
+ JWT_TYPE = "org.matrix.login.jwt"
+ JWT_TYPE_DEPRECATED = "m.login.jwt"
def __init__(self, hs):
super(LoginRestServlet, self).__init__()
@@ -108,6 +117,7 @@ class LoginRestServlet(RestServlet):
flows = []
if self.jwt_enabled:
flows.append({"type": LoginRestServlet.JWT_TYPE})
+ flows.append({"type": LoginRestServlet.JWT_TYPE_DEPRECATED})
if self.cas_enabled:
# we advertise CAS for backwards compat, though MSC1721 renamed it
@@ -141,6 +151,7 @@ class LoginRestServlet(RestServlet):
try:
if self.jwt_enabled and (
login_submission["type"] == LoginRestServlet.JWT_TYPE
+ or login_submission["type"] == LoginRestServlet.JWT_TYPE_DEPRECATED
):
result = await self.do_jwt_login(login_submission)
elif login_submission["type"] == LoginRestServlet.TOKEN_TYPE:
diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py
index eec16f8ad8..970fdd5834 100644
--- a/synapse/rest/client/v1/presence.py
+++ b/synapse/rest/client/v1/presence.py
@@ -17,8 +17,6 @@
"""
import logging
-from six import string_types
-
from synapse.api.errors import AuthError, SynapseError
from synapse.handlers.presence import format_user_presence_state
from synapse.http.servlet import RestServlet, parse_json_object_from_request
@@ -51,7 +49,9 @@ class PresenceStatusRestServlet(RestServlet):
raise AuthError(403, "You are not allowed to see their presence.")
state = await self.presence_handler.get_state(target_user=user)
- state = format_user_presence_state(state, self.clock.time_msec())
+ state = format_user_presence_state(
+ state, self.clock.time_msec(), include_user_id=False
+ )
return 200, state
@@ -71,7 +71,7 @@ class PresenceStatusRestServlet(RestServlet):
if "status_msg" in content:
state["status_msg"] = content.pop("status_msg")
- if not isinstance(state["status_msg"], string_types):
+ if not isinstance(state["status_msg"], str):
raise SynapseError(400, "status_msg must be a string.")
if content:
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 105e0cf4d2..46811abbfa 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -18,8 +18,7 @@
import logging
import re
from typing import List, Optional
-
-from six.moves.urllib import parse as urlparse
+from urllib import parse as urlparse
from canonicaljson import json
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index b58a77826f..182a308eef 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -15,8 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-
-from six.moves import http_client
+from http import HTTPStatus
from synapse.api.constants import LoginType
from synapse.api.errors import Codes, SynapseError, ThreepidValidationError
@@ -320,7 +319,7 @@ class DeactivateAccountRestServlet(RestServlet):
erase = body.get("erase", False)
if not isinstance(erase, bool):
raise SynapseError(
- http_client.BAD_REQUEST,
+ HTTPStatus.BAD_REQUEST,
"Param 'erase' must be a boolean, if given",
Codes.BAD_JSON,
)
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index c8d2de7b54..56a451c42f 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -18,8 +18,6 @@ import hmac
import logging
from typing import List, Union
-from six import string_types
-
import synapse
import synapse.api.auth
import synapse.types
@@ -411,7 +409,7 @@ class RegisterRestServlet(RestServlet):
# in sessions. Pull out the username/password provided to us.
if "password" in body:
password = body.pop("password")
- if not isinstance(password, string_types) or len(password) > 512:
+ if not isinstance(password, str) or len(password) > 512:
raise SynapseError(400, "Invalid password")
self.password_policy_handler.validate_password(password)
@@ -423,10 +421,7 @@ class RegisterRestServlet(RestServlet):
desired_username = None
if "username" in body:
- if (
- not isinstance(body["username"], string_types)
- or len(body["username"]) > 512
- ):
+ if not isinstance(body["username"], str) or len(body["username"]) > 512:
raise SynapseError(400, "Invalid username")
desired_username = body["username"]
@@ -451,7 +446,7 @@ class RegisterRestServlet(RestServlet):
access_token = self.auth.get_access_token_from_request(request)
- if isinstance(desired_username, string_types):
+ if isinstance(desired_username, str):
result = await self._do_appservice_registration(
desired_username, access_token, body
)
diff --git a/synapse/rest/client/v2_alpha/report_event.py b/synapse/rest/client/v2_alpha/report_event.py
index f067b5edac..e15927c4ea 100644
--- a/synapse/rest/client/v2_alpha/report_event.py
+++ b/synapse/rest/client/v2_alpha/report_event.py
@@ -14,9 +14,7 @@
# limitations under the License.
import logging
-
-from six import string_types
-from six.moves import http_client
+from http import HTTPStatus
from synapse.api.errors import Codes, SynapseError
from synapse.http.servlet import (
@@ -47,15 +45,15 @@ class ReportEventRestServlet(RestServlet):
body = parse_json_object_from_request(request)
assert_params_in_dict(body, ("reason", "score"))
- if not isinstance(body["reason"], string_types):
+ if not isinstance(body["reason"], str):
raise SynapseError(
- http_client.BAD_REQUEST,
+ HTTPStatus.BAD_REQUEST,
"Param 'reason' must be a string",
Codes.BAD_JSON,
)
if not isinstance(body["score"], int):
raise SynapseError(
- http_client.BAD_REQUEST,
+ HTTPStatus.BAD_REQUEST,
"Param 'score' must be an integer",
Codes.BAD_JSON,
)
diff --git a/synapse/rest/consent/consent_resource.py b/synapse/rest/consent/consent_resource.py
index 4a20282d1b..0a890c98cb 100644
--- a/synapse/rest/consent/consent_resource.py
+++ b/synapse/rest/consent/consent_resource.py
@@ -16,10 +16,9 @@
import hmac
import logging
from hashlib import sha256
+from http import HTTPStatus
from os import path
-from six.moves import http_client
-
import jinja2
from jinja2 import TemplateNotFound
@@ -219,4 +218,4 @@ class ConsentResource(DirectServeResource):
)
if not compare_digest(want_mac, userhmac):
- raise SynapseError(http_client.FORBIDDEN, "HMAC incorrect")
+ raise SynapseError(HTTPStatus.FORBIDDEN, "HMAC incorrect")
diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py
index 3689777266..595849f9d5 100644
--- a/synapse/rest/media/v1/_base.py
+++ b/synapse/rest/media/v1/_base.py
@@ -16,8 +16,7 @@
import logging
import os
-
-from six.moves import urllib
+import urllib
from twisted.internet import defer
from twisted.protocols.basic import FileSender
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index fd10d42f2f..45628c07b4 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -20,8 +20,6 @@ import os
import shutil
from typing import Dict, Tuple
-from six import iteritems
-
import twisted.internet.error
import twisted.web.http
from twisted.web.resource import Resource
@@ -340,7 +338,7 @@ class MediaRepository(object):
with self.media_storage.store_into_file(file_info) as (f, fname, finish):
request_path = "/".join(
- ("/_matrix/media/v1/download", server_name, media_id)
+ ("/_matrix/media/r0/download", server_name, media_id)
)
try:
length, headers = await self.client.get_file(
@@ -606,7 +604,7 @@ class MediaRepository(object):
thumbnails[(t_width, t_height, r_type)] = r_method
# Now we generate the thumbnails for each dimension, store it
- for (t_width, t_height, t_type), t_method in iteritems(thumbnails):
+ for (t_width, t_height, t_type), t_method in thumbnails.items():
# Generate the thumbnail
if t_method == "crop":
t_byte_source = await defer_to_thread(
@@ -705,7 +703,7 @@ class MediaRepositoryResource(Resource):
Uploads are POSTed to a resource which returns a token which is used to GET
the download::
- => POST /_matrix/media/v1/upload HTTP/1.1
+ => POST /_matrix/media/r0/upload HTTP/1.1
Content-Type: <media-type>
Content-Length: <content-length>
@@ -716,7 +714,7 @@ class MediaRepositoryResource(Resource):
{ "content_uri": "mxc://<server-name>/<media-id>" }
- => GET /_matrix/media/v1/download/<server-name>/<media-id> HTTP/1.1
+ => GET /_matrix/media/r0/download/<server-name>/<media-id> HTTP/1.1
<= HTTP/1.1 200 OK
Content-Type: <media-type>
@@ -727,7 +725,7 @@ class MediaRepositoryResource(Resource):
Clients can get thumbnails by supplying a desired width and height and
thumbnailing method::
- => GET /_matrix/media/v1/thumbnail/<server_name>
+ => GET /_matrix/media/r0/thumbnail/<server_name>
/<media-id>?width=<w>&height=<h>&method=<m> HTTP/1.1
<= HTTP/1.1 200 OK
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index 683a79c966..79cb0dddbe 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -17,9 +17,6 @@ import contextlib
import logging
import os
import shutil
-import sys
-
-import six
from twisted.internet import defer
from twisted.protocols.basic import FileSender
@@ -117,12 +114,11 @@ class MediaStorage(object):
with open(fname, "wb") as f:
yield f, fname, finish
except Exception:
- t, v, tb = sys.exc_info()
try:
os.remove(fname)
except Exception:
pass
- six.reraise(t, v, tb)
+ raise
if not finished_called:
raise Exception("Finished callback not called")
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index f206605727..b4645cd608 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -24,10 +24,7 @@ import shutil
import sys
import traceback
from typing import Dict, Optional
-
-import six
-from six import string_types
-from six.moves import urllib_parse as urlparse
+from urllib import parse as urlparse
from canonicaljson import json
@@ -85,6 +82,15 @@ class PreviewUrlResource(DirectServeResource):
self.primary_base_path = media_repo.primary_base_path
self.media_storage = media_storage
+ # We run the background jobs if we're the instance specified (or no
+ # instance is specified, where we assume there is only one instance
+ # serving media).
+ instance_running_jobs = hs.config.media.media_instance_running_background_jobs
+ self._worker_run_media_background_jobs = (
+ instance_running_jobs is None
+ or instance_running_jobs == hs.get_instance_name()
+ )
+
self.url_preview_url_blacklist = hs.config.url_preview_url_blacklist
self.url_preview_accept_language = hs.config.url_preview_accept_language
@@ -97,9 +103,10 @@ class PreviewUrlResource(DirectServeResource):
expiry_ms=60 * 60 * 1000,
)
- self._cleaner_loop = self.clock.looping_call(
- self._start_expire_url_cache_data, 10 * 1000
- )
+ if self._worker_run_media_background_jobs:
+ self._cleaner_loop = self.clock.looping_call(
+ self._start_expire_url_cache_data, 10 * 1000
+ )
def render_OPTIONS(self, request):
request.setHeader(b"Allow", b"OPTIONS, GET")
@@ -188,7 +195,7 @@ class PreviewUrlResource(DirectServeResource):
# It may be stored as text in the database, not as bytes (such as
# PostgreSQL). If so, encode it back before handing it on.
og = cache_result["og"]
- if isinstance(og, six.text_type):
+ if isinstance(og, str):
og = og.encode("utf8")
return og
@@ -400,6 +407,8 @@ class PreviewUrlResource(DirectServeResource):
"""
# TODO: Delete from backup media store
+ assert self._worker_run_media_background_jobs
+
now = self.clock.time_msec()
logger.debug("Running url preview cache expiry")
@@ -631,7 +640,7 @@ def _iterate_over_text(tree, *tags_to_ignore):
if el is None:
return
- if isinstance(el, string_types):
+ if isinstance(el, str):
yield el
elif el.tag not in tags_to_ignore:
# el.text is the text before the first child, so we can immediately
diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py
index 3bf330da49..3bfc8d7278 100644
--- a/synapse/server_notices/consent_server_notices.py
+++ b/synapse/server_notices/consent_server_notices.py
@@ -14,8 +14,6 @@
# limitations under the License.
import logging
-from six import iteritems, string_types
-
from synapse.api.errors import SynapseError
from synapse.api.urls import ConsentURIBuilder
from synapse.config import ConfigError
@@ -118,10 +116,10 @@ def copy_with_str_subst(x, substitutions):
Returns:
copy of x
"""
- if isinstance(x, string_types):
+ if isinstance(x, str):
return x % substitutions
if isinstance(x, dict):
- return {k: copy_with_str_subst(v, substitutions) for (k, v) in iteritems(x)}
+ return {k: copy_with_str_subst(v, substitutions) for (k, v) in x.items()}
if isinstance(x, (list, tuple)):
return [copy_with_str_subst(y) for y in x]
diff --git a/synapse/server_notices/resource_limits_server_notices.py b/synapse/server_notices/resource_limits_server_notices.py
index 73f2cedb5c..4404ceff93 100644
--- a/synapse/server_notices/resource_limits_server_notices.py
+++ b/synapse/server_notices/resource_limits_server_notices.py
@@ -14,8 +14,6 @@
# limitations under the License.
import logging
-from six import iteritems
-
from synapse.api.constants import (
EventTypes,
LimitBlockingTypes,
@@ -214,7 +212,7 @@ class ResourceLimitsServerNotices(object):
referenced_events = list(pinned_state_event.content.get("pinned", []))
events = await self._store.get_events(referenced_events)
- for event_id, event in iteritems(events):
+ for event_id, event in events.items():
if event.type != EventTypes.Message:
continue
if event.content.get("msgtype") == ServerNoticeMsgType:
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 2fa529fcd0..495d9f04c8 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -18,8 +18,6 @@ import logging
from collections import namedtuple
from typing import Dict, Iterable, List, Optional, Set
-from six import iteritems, itervalues
-
import attr
from frozendict import frozendict
from prometheus_client import Histogram
@@ -34,6 +32,7 @@ from synapse.logging.utils import log_function
from synapse.state import v1, v2
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
from synapse.types import StateMap
+from synapse.util import Clock
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import Measure, measure_func
@@ -144,7 +143,7 @@ class StateHandler(object):
list(state.values()), get_prev_content=False
)
state = {
- key: state_map[e_id] for key, e_id in iteritems(state) if e_id in state_map
+ key: state_map[e_id] for key, e_id in state.items() if e_id in state_map
}
return state
@@ -416,6 +415,7 @@ class StateHandler(object):
with Measure(self.clock, "state._resolve_events"):
new_state = yield resolve_events_with_store(
+ self.clock,
event.room_id,
room_version,
state_set_ids,
@@ -423,7 +423,7 @@ class StateHandler(object):
state_res_store=StateResolutionStore(self.store),
)
- new_state = {key: state_map[ev_id] for key, ev_id in iteritems(new_state)}
+ new_state = {key: state_map[ev_id] for key, ev_id in new_state.items()}
return new_state
@@ -505,8 +505,8 @@ class StateResolutionHandler(object):
# resolve_events_with_store do it?
new_state = {}
conflicted_state = False
- for st in itervalues(state_groups_ids):
- for key, e_id in iteritems(st):
+ for st in state_groups_ids.values():
+ for key, e_id in st.items():
if key in new_state:
conflicted_state = True
break
@@ -518,9 +518,10 @@ class StateResolutionHandler(object):
logger.info("Resolving conflicted state for %r", room_id)
with Measure(self.clock, "state._resolve_events"):
new_state = yield resolve_events_with_store(
+ self.clock,
room_id,
room_version,
- list(itervalues(state_groups_ids)),
+ list(state_groups_ids.values()),
event_map=event_map,
state_res_store=state_res_store,
)
@@ -561,12 +562,12 @@ def _make_state_cache_entry(new_state, state_groups_ids):
# not get persisted.
# first look for exact matches
- new_state_event_ids = set(itervalues(new_state))
- for sg, state in iteritems(state_groups_ids):
+ new_state_event_ids = set(new_state.values())
+ for sg, state in state_groups_ids.items():
if len(new_state_event_ids) != len(state):
continue
- old_state_event_ids = set(itervalues(state))
+ old_state_event_ids = set(state.values())
if new_state_event_ids == old_state_event_ids:
# got an exact match.
return _StateCacheEntry(state=new_state, state_group=sg)
@@ -579,8 +580,8 @@ def _make_state_cache_entry(new_state, state_groups_ids):
prev_group = None
delta_ids = None
- for old_group, old_state in iteritems(state_groups_ids):
- n_delta_ids = {k: v for k, v in iteritems(new_state) if old_state.get(k) != v}
+ for old_group, old_state in state_groups_ids.items():
+ n_delta_ids = {k: v for k, v in new_state.items() if old_state.get(k) != v}
if not delta_ids or len(n_delta_ids) < len(delta_ids):
prev_group = old_group
delta_ids = n_delta_ids
@@ -591,6 +592,7 @@ def _make_state_cache_entry(new_state, state_groups_ids):
def resolve_events_with_store(
+ clock: Clock,
room_id: str,
room_version: str,
state_sets: List[StateMap[str]],
@@ -627,7 +629,7 @@ def resolve_events_with_store(
)
else:
return v2.resolve_events_with_store(
- room_id, room_version, state_sets, event_map, state_res_store
+ clock, room_id, room_version, state_sets, event_map, state_res_store
)
diff --git a/synapse/state/v1.py b/synapse/state/v1.py
index 9bf98d06f2..7b531a8337 100644
--- a/synapse/state/v1.py
+++ b/synapse/state/v1.py
@@ -17,8 +17,6 @@ import hashlib
import logging
from typing import Callable, Dict, List, Optional
-from six import iteritems, iterkeys, itervalues
-
from twisted.internet import defer
from synapse import event_auth
@@ -70,11 +68,11 @@ def resolve_events_with_store(
unconflicted_state, conflicted_state = _seperate(state_sets)
needed_events = {
- event_id for event_ids in itervalues(conflicted_state) for event_id in event_ids
+ event_id for event_ids in conflicted_state.values() for event_id in event_ids
}
needed_event_count = len(needed_events)
if event_map is not None:
- needed_events -= set(iterkeys(event_map))
+ needed_events -= set(event_map.keys())
logger.info(
"Asking for %d/%d conflicted events", len(needed_events), needed_event_count
@@ -102,11 +100,11 @@ def resolve_events_with_store(
unconflicted_state, conflicted_state, state_map
)
- new_needed_events = set(itervalues(auth_events))
+ new_needed_events = set(auth_events.values())
new_needed_event_count = len(new_needed_events)
new_needed_events -= needed_events
if event_map is not None:
- new_needed_events -= set(iterkeys(event_map))
+ new_needed_events -= set(event_map.keys())
logger.info(
"Asking for %d/%d auth events", len(new_needed_events), new_needed_event_count
@@ -152,7 +150,7 @@ def _seperate(state_sets):
conflicted_state = {}
for state_set in state_set_iterator:
- for key, value in iteritems(state_set):
+ for key, value in state_set.items():
# Check if there is an unconflicted entry for the state key.
unconflicted_value = unconflicted_state.get(key)
if unconflicted_value is None:
@@ -178,7 +176,7 @@ def _seperate(state_sets):
def _create_auth_events_from_maps(unconflicted_state, conflicted_state, state_map):
auth_events = {}
- for event_ids in itervalues(conflicted_state):
+ for event_ids in conflicted_state.values():
for event_id in event_ids:
if event_id in state_map:
keys = event_auth.auth_types_for_event(state_map[event_id])
@@ -194,7 +192,7 @@ def _resolve_with_state(
unconflicted_state_ids, conflicted_state_ids, auth_event_ids, state_map
):
conflicted_state = {}
- for key, event_ids in iteritems(conflicted_state_ids):
+ for key, event_ids in conflicted_state_ids.items():
events = [state_map[ev_id] for ev_id in event_ids if ev_id in state_map]
if len(events) > 1:
conflicted_state[key] = events
@@ -203,7 +201,7 @@ def _resolve_with_state(
auth_events = {
key: state_map[ev_id]
- for key, ev_id in iteritems(auth_event_ids)
+ for key, ev_id in auth_event_ids.items()
if ev_id in state_map
}
@@ -214,7 +212,7 @@ def _resolve_with_state(
raise
new_state = unconflicted_state_ids
- for key, event in iteritems(resolved_state):
+ for key, event in resolved_state.items():
new_state[key] = event.event_id
return new_state
@@ -238,21 +236,21 @@ def _resolve_state_events(conflicted_state, auth_events):
auth_events.update(resolved_state)
- for key, events in iteritems(conflicted_state):
+ for key, events in conflicted_state.items():
if key[0] == EventTypes.JoinRules:
logger.debug("Resolving conflicted join rules %r", events)
resolved_state[key] = _resolve_auth_events(events, auth_events)
auth_events.update(resolved_state)
- for key, events in iteritems(conflicted_state):
+ for key, events in conflicted_state.items():
if key[0] == EventTypes.Member:
logger.debug("Resolving conflicted member lists %r", events)
resolved_state[key] = _resolve_auth_events(events, auth_events)
auth_events.update(resolved_state)
- for key, events in iteritems(conflicted_state):
+ for key, events in conflicted_state.items():
if key not in resolved_state:
logger.debug("Resolving conflicted state %r:%r", key, events)
resolved_state[key] = _resolve_normal_events(events, auth_events)
diff --git a/synapse/state/v2.py b/synapse/state/v2.py
index 18484e2fa6..bf6caa0946 100644
--- a/synapse/state/v2.py
+++ b/synapse/state/v2.py
@@ -18,8 +18,6 @@ import itertools
import logging
from typing import Dict, List, Optional
-from six import iteritems, itervalues
-
from twisted.internet import defer
import synapse.state
@@ -29,12 +27,20 @@ from synapse.api.errors import AuthError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.types import StateMap
+from synapse.util import Clock
logger = logging.getLogger(__name__)
+# We want to yield to the reactor occasionally during state res when dealing
+# with large data sets, so that we don't exhaust the reactor. This is done by
+# yielding to reactor during loops every N iterations.
+_YIELD_AFTER_ITERATIONS = 100
+
+
@defer.inlineCallbacks
def resolve_events_with_store(
+ clock: Clock,
room_id: str,
room_version: str,
state_sets: List[StateMap[str]],
@@ -44,13 +50,11 @@ def resolve_events_with_store(
"""Resolves the state using the v2 state resolution algorithm
Args:
+ clock
room_id: the room we are working in
-
room_version: The room version
-
state_sets: List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.
-
event_map:
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
@@ -87,7 +91,7 @@ def resolve_events_with_store(
full_conflicted_set = set(
itertools.chain(
- itertools.chain.from_iterable(itervalues(conflicted_state)), auth_diff
+ itertools.chain.from_iterable(conflicted_state.values()), auth_diff
)
)
@@ -115,13 +119,14 @@ def resolve_events_with_store(
)
sorted_power_events = yield _reverse_topological_power_sort(
- room_id, power_events, event_map, state_res_store, full_conflicted_set
+ clock, room_id, power_events, event_map, state_res_store, full_conflicted_set
)
logger.debug("sorted %d power events", len(sorted_power_events))
# Now sequentially auth each one
resolved_state = yield _iterative_auth_checks(
+ clock,
room_id,
room_version,
sorted_power_events,
@@ -135,20 +140,22 @@ def resolve_events_with_store(
# OK, so we've now resolved the power events. Now sort the remaining
# events using the mainline of the resolved power level.
+ set_power_events = set(sorted_power_events)
leftover_events = [
- ev_id for ev_id in full_conflicted_set if ev_id not in sorted_power_events
+ ev_id for ev_id in full_conflicted_set if ev_id not in set_power_events
]
logger.debug("sorting %d remaining events", len(leftover_events))
pl = resolved_state.get((EventTypes.PowerLevels, ""), None)
leftover_events = yield _mainline_sort(
- room_id, leftover_events, pl, event_map, state_res_store
+ clock, room_id, leftover_events, pl, event_map, state_res_store
)
logger.debug("resolving remaining events")
resolved_state = yield _iterative_auth_checks(
+ clock,
room_id,
room_version,
leftover_events,
@@ -318,12 +325,13 @@ def _add_event_and_auth_chain_to_graph(
@defer.inlineCallbacks
def _reverse_topological_power_sort(
- room_id, event_ids, event_map, state_res_store, auth_diff
+ clock, room_id, event_ids, event_map, state_res_store, auth_diff
):
"""Returns a list of the event_ids sorted by reverse topological ordering,
and then by power level and origin_server_ts
Args:
+ clock (Clock)
room_id (str): the room we are working in
event_ids (list[str]): The events to sort
event_map (dict[str,FrozenEvent])
@@ -335,18 +343,28 @@ def _reverse_topological_power_sort(
"""
graph = {}
- for event_id in event_ids:
+ for idx, event_id in enumerate(event_ids, start=1):
yield _add_event_and_auth_chain_to_graph(
graph, room_id, event_id, event_map, state_res_store, auth_diff
)
+ # We yield occasionally when we're working with large data sets to
+ # ensure that we don't block the reactor loop for too long.
+ if idx % _YIELD_AFTER_ITERATIONS == 0:
+ yield clock.sleep(0)
+
event_to_pl = {}
- for event_id in graph:
+ for idx, event_id in enumerate(graph, start=1):
pl = yield _get_power_level_for_sender(
room_id, event_id, event_map, state_res_store
)
event_to_pl[event_id] = pl
+ # We yield occasionally when we're working with large data sets to
+ # ensure that we don't block the reactor loop for too long.
+ if idx % _YIELD_AFTER_ITERATIONS == 0:
+ yield clock.sleep(0)
+
def _get_power_order(event_id):
ev = event_map[event_id]
pl = event_to_pl[event_id]
@@ -362,12 +380,13 @@ def _reverse_topological_power_sort(
@defer.inlineCallbacks
def _iterative_auth_checks(
- room_id, room_version, event_ids, base_state, event_map, state_res_store
+ clock, room_id, room_version, event_ids, base_state, event_map, state_res_store
):
"""Sequentially apply auth checks to each event in given list, updating the
state as it goes along.
Args:
+ clock (Clock)
room_id (str)
room_version (str)
event_ids (list[str]): Ordered list of events to apply auth checks to
@@ -381,7 +400,7 @@ def _iterative_auth_checks(
resolved_state = base_state.copy()
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
- for event_id in event_ids:
+ for idx, event_id in enumerate(event_ids, start=1):
event = event_map[event_id]
auth_events = {}
@@ -419,17 +438,23 @@ def _iterative_auth_checks(
except AuthError:
pass
+ # We yield occasionally when we're working with large data sets to
+ # ensure that we don't block the reactor loop for too long.
+ if idx % _YIELD_AFTER_ITERATIONS == 0:
+ yield clock.sleep(0)
+
return resolved_state
@defer.inlineCallbacks
def _mainline_sort(
- room_id, event_ids, resolved_power_event_id, event_map, state_res_store
+ clock, room_id, event_ids, resolved_power_event_id, event_map, state_res_store
):
"""Returns a sorted list of event_ids sorted by mainline ordering based on
the given event resolved_power_event_id
Args:
+ clock (Clock)
room_id (str): room we're working in
event_ids (list[str]): Events to sort
resolved_power_event_id (str): The final resolved power level event ID
@@ -439,8 +464,14 @@ def _mainline_sort(
Returns:
Deferred[list[str]]: The sorted list
"""
+ if not event_ids:
+ # It's possible for there to be no event IDs here to sort, so we can
+ # skip calculating the mainline in that case.
+ return []
+
mainline = []
pl = resolved_power_event_id
+ idx = 0
while pl:
mainline.append(pl)
pl_ev = yield _get_event(room_id, pl, event_map, state_res_store)
@@ -454,17 +485,29 @@ def _mainline_sort(
pl = aid
break
+ # We yield occasionally when we're working with large data sets to
+ # ensure that we don't block the reactor loop for too long.
+ if idx != 0 and idx % _YIELD_AFTER_ITERATIONS == 0:
+ yield clock.sleep(0)
+
+ idx += 1
+
mainline_map = {ev_id: i + 1 for i, ev_id in enumerate(reversed(mainline))}
event_ids = list(event_ids)
order_map = {}
- for ev_id in event_ids:
+ for idx, ev_id in enumerate(event_ids, start=1):
depth = yield _get_mainline_depth_for_event(
event_map[ev_id], mainline_map, event_map, state_res_store
)
order_map[ev_id] = (depth, event_map[ev_id].origin_server_ts, ev_id)
+ # We yield occasionally when we're working with large data sets to
+ # ensure that we don't block the reactor loop for too long.
+ if idx % _YIELD_AFTER_ITERATIONS == 0:
+ yield clock.sleep(0)
+
event_ids.sort(key=lambda ev_id: order_map[ev_id])
return event_ids
@@ -572,7 +615,7 @@ def lexicographical_topological_sort(graph, key):
# `(key(node), node)` so that sorting does the right thing
zero_outdegree = []
- for node, edges in iteritems(graph):
+ for node, edges in graph.items():
if len(edges) == 0:
zero_outdegree.append((key(node), node))
diff --git a/synapse/static/client/login/index.html b/synapse/static/client/login/index.html
index 6fefdaaff7..9e6daf38ac 100644
--- a/synapse/static/client/login/index.html
+++ b/synapse/static/client/login/index.html
@@ -1,24 +1,24 @@
<!doctype html>
<html>
<head>
-<title> Login </title>
-<meta name='viewport' content='width=device-width, initial-scale=1, user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
-<link rel="stylesheet" href="style.css">
-<script src="js/jquery-3.4.1.min.js"></script>
-<script src="js/login.js"></script>
+ <meta http-equiv="Content-Type" content="text/html; charset=utf-8">
+ <title> Login </title>
+ <meta name='viewport' content='width=device-width, initial-scale=1, user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
+ <link rel="stylesheet" href="style.css">
+ <script src="js/jquery-3.4.1.min.js"></script>
+ <script src="js/login.js"></script>
</head>
<body onload="matrixLogin.onLoad()">
- <center>
- <br/>
+ <div id="container">
<h1 id="title"></h1>
- <span id="feedback" style="color: #f00"></span>
+ <span id="feedback"></span>
<div id="loading">
<img src="spinner.gif" />
</div>
- <div id="sso_flow" class="login_flow" style="display:none">
+ <div id="sso_flow" class="login_flow" style="display: none;">
Single-sign on:
<form id="sso_form" action="/_matrix/client/r0/login/sso/redirect" method="get">
<input id="sso_redirect_url" type="hidden" name="redirectUrl" value=""/>
@@ -26,9 +26,9 @@
</form>
</div>
- <div id="password_flow" class="login_flow" style="display:none">
+ <div id="password_flow" class="login_flow" style="display: none;">
Password Authentication:
- <form onsubmit="matrixLogin.password_login(); return false;">
+ <form onsubmit="matrixLogin.passwordLogin(); return false;">
<input id="user_id" size="32" type="text" placeholder="Matrix ID (e.g. bob)" autocapitalize="off" autocorrect="off" />
<br/>
<input id="password" size="32" type="password" placeholder="Password"/>
@@ -38,9 +38,9 @@
</form>
</div>
- <div id="no_login_types" type="button" class="login_flow" style="display:none">
+ <div id="no_login_types" type="button" class="login_flow" style="display: none;">
Log in currently unavailable.
</div>
- </center>
+ </div>
</body>
</html>
diff --git a/synapse/static/client/login/js/login.js b/synapse/static/client/login/js/login.js
index ba8048b23f..3678670ec7 100644
--- a/synapse/static/client/login/js/login.js
+++ b/synapse/static/client/login/js/login.js
@@ -5,11 +5,11 @@ window.matrixLogin = {
};
// Titles get updated through the process to give users feedback.
-var TITLE_PRE_AUTH = "Log in with one of the following methods";
-var TITLE_POST_AUTH = "Logging in...";
+const TITLE_PRE_AUTH = "Log in with one of the following methods";
+const TITLE_POST_AUTH = "Logging in...";
// The cookie used to store the original query parameters when using SSO.
-var COOKIE_KEY = "synapse_login_fallback_qs";
+const COOKIE_KEY = "synapse_login_fallback_qs";
/*
* Submit a login request.
@@ -20,9 +20,9 @@ var COOKIE_KEY = "synapse_login_fallback_qs";
* login request, e.g. device_id.
* callback: (Optional) Function to call on successful login.
*/
-var submitLogin = function(type, data, extra, callback) {
+function submitLogin(type, data, extra, callback) {
console.log("Logging in with " + type);
- set_title(TITLE_POST_AUTH);
+ setTitle(TITLE_POST_AUTH);
// Add the login type.
data.type = type;
@@ -41,12 +41,15 @@ var submitLogin = function(type, data, extra, callback) {
}
matrixLogin.onLogin(response);
}).fail(errorFunc);
-};
+}
-var errorFunc = function(err) {
+/*
+ * Display an error to the user and show the login form again.
+ */
+function errorFunc(err) {
// We want to show the error to the user rather than redirecting immediately to the
// SSO portal (if SSO is the only login option), so we inhibit the redirect.
- show_login(true);
+ showLogin(true);
if (err.responseJSON && err.responseJSON.error) {
setFeedbackString(err.responseJSON.error + " (" + err.responseJSON.errcode + ")");
@@ -54,27 +57,42 @@ var errorFunc = function(err) {
else {
setFeedbackString("Request failed: " + err.status);
}
-};
+}
-var setFeedbackString = function(text) {
+/*
+ * Display an error to the user.
+ */
+function setFeedbackString(text) {
$("#feedback").text(text);
-};
+}
-var show_login = function(inhibit_redirect) {
- // Set the redirect to come back to this page, a login token will get added
- // and handled after the redirect.
- var this_page = window.location.origin + window.location.pathname;
- $("#sso_redirect_url").val(this_page);
+/*
+ * (Maybe) Show the login forms.
+ *
+ * This actually does a few unrelated functions:
+ *
+ * * Configures the SSO redirect URL to come back to this page.
+ * * Configures and shows the SSO form, if the server supports SSO.
+ * * Otherwise, shows the password form.
+ */
+function showLogin(inhibitRedirect) {
+ setTitle(TITLE_PRE_AUTH);
- // If inhibit_redirect is false, and SSO is the only supported login method,
+ // If inhibitRedirect is false, and SSO is the only supported login method,
// we can redirect straight to the SSO page.
if (matrixLogin.serverAcceptsSso) {
+ // Set the redirect to come back to this page, a login token will get
+ // added as a query parameter and handled after the redirect.
+ $("#sso_redirect_url").val(window.location.origin + window.location.pathname);
+
// Before submitting SSO, set the current query parameters into a cookie
// for retrieval later.
var qs = parseQsFromUrl();
setCookie(COOKIE_KEY, JSON.stringify(qs));
- if (!inhibit_redirect && !matrixLogin.serverAcceptsPassword) {
+ // If password is not supported and redirects are allowed, then submit
+ // the form (redirecting to the SSO provider).
+ if (!inhibitRedirect && !matrixLogin.serverAcceptsPassword) {
$("#sso_form").submit();
return;
}
@@ -87,30 +105,39 @@ var show_login = function(inhibit_redirect) {
$("#password_flow").show();
}
+ // If neither password or SSO are supported, show an error to the user.
if (!matrixLogin.serverAcceptsPassword && !matrixLogin.serverAcceptsSso) {
$("#no_login_types").show();
}
- set_title(TITLE_PRE_AUTH);
-
$("#loading").hide();
-};
+}
-var show_spinner = function() {
+/*
+ * Hides the forms and shows a loading throbber.
+ */
+function showSpinner() {
$("#password_flow").hide();
$("#sso_flow").hide();
$("#no_login_types").hide();
$("#loading").show();
-};
+}
-var set_title = function(title) {
+/*
+ * Helper to show the page's main title.
+ */
+function setTitle(title) {
$("#title").text(title);
-};
+}
-var fetch_info = function(cb) {
+/*
+ * Query the login endpoint for the homeserver's supported flows.
+ *
+ * This populates matrixLogin.serverAccepts* variables.
+ */
+function fetchLoginFlows(cb) {
$.get(matrixLogin.endpoint, function(response) {
- var serverAcceptsPassword = false;
- for (var i=0; i<response.flows.length; i++) {
+ for (var i = 0; i < response.flows.length; i++) {
var flow = response.flows[i];
if ("m.login.sso" === flow.type) {
matrixLogin.serverAcceptsSso = true;
@@ -126,27 +153,41 @@ var fetch_info = function(cb) {
}).fail(errorFunc);
}
+/*
+ * Called on load to fetch login flows and attempt SSO login (if a token is available).
+ */
matrixLogin.onLoad = function() {
- fetch_info(function() {
- if (!try_token()) {
- show_login(false);
+ fetchLoginFlows(function() {
+ // (Maybe) attempt logging in via SSO if a token is available.
+ if (!tryTokenLogin()) {
+ showLogin(false);
}
});
};
-matrixLogin.password_login = function() {
+/*
+ * Submit simple user & password login.
+ */
+matrixLogin.passwordLogin = function() {
var user = $("#user_id").val();
var pwd = $("#password").val();
setFeedbackString("");
- show_spinner();
+ showSpinner();
submitLogin(
"m.login.password",
{user: user, password: pwd},
parseQsFromUrl());
};
+/*
+ * The onLogin function gets called after a succesful login.
+ *
+ * It is expected that implementations override this to be notified when the
+ * login is complete. The response to the login call is provided as the single
+ * parameter.
+ */
matrixLogin.onLogin = function(response) {
// clobber this function
console.warn("onLogin - This function should be replaced to proceed.");
@@ -155,7 +196,7 @@ matrixLogin.onLogin = function(response) {
/*
* Process the query parameters from the current URL into an object.
*/
-var parseQsFromUrl = function() {
+function parseQsFromUrl() {
var pos = window.location.href.indexOf("?");
if (pos == -1) {
return {};
@@ -174,12 +215,12 @@ var parseQsFromUrl = function() {
result[key] = val;
});
return result;
-};
+}
/*
* Process the cookies and return an object.
*/
-var parseCookies = function() {
+function parseCookies() {
var allCookies = document.cookie;
var result = {};
allCookies.split(";").forEach(function(part) {
@@ -196,32 +237,32 @@ var parseCookies = function() {
result[key] = val;
});
return result;
-};
+}
/*
* Set a cookie that is valid for 1 hour.
*/
-var setCookie = function(key, value) {
+function setCookie(key, value) {
// The maximum age is set in seconds.
var maxAge = 60 * 60;
// Set the cookie, this defaults to the current domain and path.
document.cookie = key + "=" + encodeURIComponent(value) + ";max-age=" + maxAge + ";sameSite=lax";
-};
+}
/*
* Removes a cookie by key.
*/
-var deleteCookie = function(key) {
+function deleteCookie(key) {
// Delete a cookie by setting the expiration to 0. (Note that the value
// doesn't matter.)
document.cookie = key + "=deleted;expires=0";
-};
+}
/*
* Submits the login token if one is found in the query parameters. Returns a
* boolean of whether the login token was found or not.
*/
-var try_token = function() {
+function tryTokenLogin() {
// Check if the login token is in the query parameters.
var qs = parseQsFromUrl();
@@ -233,18 +274,18 @@ var try_token = function() {
// Retrieve the original query parameters (from before the SSO redirect).
// They are stored as JSON in a cookie.
var cookies = parseCookies();
- var original_query_params = JSON.parse(cookies[COOKIE_KEY] || "{}")
+ var originalQueryParams = JSON.parse(cookies[COOKIE_KEY] || "{}")
// If the login is successful, delete the cookie.
- var callback = function() {
+ function callback() {
deleteCookie(COOKIE_KEY);
}
submitLogin(
"m.login.token",
{token: loginToken},
- original_query_params,
+ originalQueryParams,
callback);
return true;
-};
+}
diff --git a/synapse/static/client/login/style.css b/synapse/static/client/login/style.css
index 1cce5ed950..83e4f6abc8 100644
--- a/synapse/static/client/login/style.css
+++ b/synapse/static/client/login/style.css
@@ -31,20 +31,44 @@ form {
margin: 10px 0 0 0;
}
+/*
+ * Add some padding to the viewport.
+ */
+#container {
+ padding: 10px;
+}
+/*
+ * Center all direct children of the main form.
+ */
+#container > * {
+ display: block;
+ margin-left: auto;
+ margin-right: auto;
+ text-align: center;
+}
+
+/*
+ * A wrapper around each login flow.
+ */
.login_flow {
width: 300px;
text-align: left;
padding: 10px;
margin-bottom: 40px;
- -webkit-border-radius: 10px;
- -moz-border-radius: 10px;
border-radius: 10px;
-
- -webkit-box-shadow: 0px 0px 20px 0px rgba(0,0,0,0.15);
- -moz-box-shadow: 0px 0px 20px 0px rgba(0,0,0,0.15);
box-shadow: 0px 0px 20px 0px rgba(0,0,0,0.15);
background-color: #f8f8f8;
border: 1px #ccc solid;
}
+
+/*
+ * Used to show error content.
+ */
+#feedback {
+ /* Red text. */
+ color: #ff0000;
+ /* A little space to not overlap the box-shadow. */
+ margin-bottom: 20px;
+}
diff --git a/synapse/storage/data_stores/main/client_ips.py b/synapse/storage/data_stores/main/client_ips.py
index 71f8d43a76..995d4764a9 100644
--- a/synapse/storage/data_stores/main/client_ips.py
+++ b/synapse/storage/data_stores/main/client_ips.py
@@ -15,8 +15,6 @@
import logging
-from six import iteritems
-
from twisted.internet import defer
from synapse.metrics.background_process_metrics import wrap_as_background_process
@@ -421,7 +419,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
):
self.database_engine.lock_table(txn, "user_ips")
- for entry in iteritems(to_update):
+ for entry in to_update.items():
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
try:
@@ -530,7 +528,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
"user_agent": user_agent,
"last_seen": last_seen,
}
- for (access_token, ip), (user_agent, last_seen) in iteritems(results)
+ for (access_token, ip), (user_agent, last_seen) in results.items()
]
@wrap_as_background_process("prune_old_user_ips")
diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py
index fb9f798e29..0ff0542453 100644
--- a/synapse/storage/data_stores/main/devices.py
+++ b/synapse/storage/data_stores/main/devices.py
@@ -17,8 +17,6 @@
import logging
from typing import List, Optional, Set, Tuple
-from six import iteritems
-
from canonicaljson import json
from twisted.internet import defer
@@ -208,7 +206,7 @@ class DeviceWorkerStore(SQLBaseStore):
)
# add the updated cross-signing keys to the results list
- for user_id, result in iteritems(cross_signing_keys_by_user):
+ for user_id, result in cross_signing_keys_by_user.items():
result["user_id"] = user_id
# FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
results.append(("org.matrix.signing_key_update", result))
@@ -269,7 +267,7 @@ class DeviceWorkerStore(SQLBaseStore):
)
results = []
- for user_id, user_devices in iteritems(devices):
+ for user_id, user_devices in devices.items():
# The prev_id for the first row is always the last row before
# `from_stream_id`
prev_id = yield self._get_last_device_update_for_remote_user(
@@ -493,7 +491,7 @@ class DeviceWorkerStore(SQLBaseStore):
if devices:
user_devices = devices[user_id]
results = []
- for device_id, device in iteritems(user_devices):
+ for device_id, device in user_devices.items():
result = {"device_id": device_id}
key_json = device.get("key_json", None)
diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py
index 20698bfd16..1a0842d4b0 100644
--- a/synapse/storage/data_stores/main/end_to_end_keys.py
+++ b/synapse/storage/data_stores/main/end_to_end_keys.py
@@ -16,8 +16,6 @@
# limitations under the License.
from typing import Dict, List
-from six import iteritems
-
from canonicaljson import encode_canonical_json, json
from twisted.enterprise.adbapi import Connection
@@ -64,9 +62,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
# Build the result structure, un-jsonify the results, and add the
# "unsigned" section
rv = {}
- for user_id, device_keys in iteritems(results):
+ for user_id, device_keys in results.items():
rv[user_id] = {}
- for device_id, device_info in iteritems(device_keys):
+ for device_id, device_info in device_keys.items():
r = db_to_json(device_info.pop("key_json"))
r["unsigned"] = {}
display_name = device_info["device_display_name"]
diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py
index 24ce8c4330..a6bb3221ff 100644
--- a/synapse/storage/data_stores/main/event_federation.py
+++ b/synapse/storage/data_stores/main/event_federation.py
@@ -14,10 +14,9 @@
# limitations under the License.
import itertools
import logging
+from queue import Empty, PriorityQueue
from typing import Dict, List, Optional, Set, Tuple
-from six.moves.queue import Empty, PriorityQueue
-
from twisted.internet import defer
from synapse.api.errors import StoreError
diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py
index 0321274de2..bc9f4f08ea 100644
--- a/synapse/storage/data_stores/main/event_push_actions.py
+++ b/synapse/storage/data_stores/main/event_push_actions.py
@@ -16,8 +16,6 @@
import logging
-from six import iteritems
-
from canonicaljson import json
from twisted.internet import defer
@@ -455,7 +453,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
sql,
(
_gen_entry(user_id, actions)
- for user_id, actions in iteritems(user_id_actions)
+ for user_id, actions in user_id_actions.items()
),
)
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index a6572571b4..cfd24d2f06 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -21,9 +21,6 @@ from collections import OrderedDict, namedtuple
from functools import wraps
from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple
-from six import integer_types, iteritems, text_type
-from six.moves import range
-
import attr
from canonicaljson import json
from prometheus_client import Counter
@@ -232,10 +229,10 @@ class PersistEventsStore:
event_counter.labels(event.type, origin_type, origin_entity).inc()
- for room_id, new_state in iteritems(current_state_for_room):
+ for room_id, new_state in current_state_for_room.items():
self.store.get_current_state_ids.prefill((room_id,), new_state)
- for room_id, latest_event_ids in iteritems(new_forward_extremeties):
+ for room_id, latest_event_ids in new_forward_extremeties.items():
self.store.get_latest_event_ids_in_room.prefill(
(room_id,), list(latest_event_ids)
)
@@ -461,7 +458,7 @@ class PersistEventsStore:
state_delta_by_room: Dict[str, DeltaState],
stream_id: int,
):
- for room_id, delta_state in iteritems(state_delta_by_room):
+ for room_id, delta_state in state_delta_by_room.items():
to_delete = delta_state.to_delete
to_insert = delta_state.to_insert
@@ -545,7 +542,7 @@ class PersistEventsStore:
""",
[
(room_id, key[0], key[1], ev_id, ev_id)
- for key, ev_id in iteritems(to_insert)
+ for key, ev_id in to_insert.items()
],
)
@@ -642,7 +639,7 @@ class PersistEventsStore:
def _update_forward_extremities_txn(
self, txn, new_forward_extremities, max_stream_order
):
- for room_id, new_extrem in iteritems(new_forward_extremities):
+ for room_id, new_extrem in new_forward_extremities.items():
self.db.simple_delete_txn(
txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
)
@@ -655,7 +652,7 @@ class PersistEventsStore:
table="event_forward_extremities",
values=[
{"event_id": ev_id, "room_id": room_id}
- for room_id, new_extrem in iteritems(new_forward_extremities)
+ for room_id, new_extrem in new_forward_extremities.items()
for ev_id in new_extrem
],
)
@@ -672,7 +669,7 @@ class PersistEventsStore:
"event_id": event_id,
"stream_ordering": max_stream_order,
}
- for room_id, new_extrem in iteritems(new_forward_extremities)
+ for room_id, new_extrem in new_forward_extremities.items()
for event_id in new_extrem
],
)
@@ -727,7 +724,7 @@ class PersistEventsStore:
event.depth, depth_updates.get(event.room_id, event.depth)
)
- for room_id, depth in iteritems(depth_updates):
+ for room_id, depth in depth_updates.items():
self._update_min_depth_for_room_txn(txn, room_id, depth)
def _update_outliers_txn(self, txn, events_and_contexts):
@@ -893,8 +890,7 @@ class PersistEventsStore:
"received_ts": self._clock.time_msec(),
"sender": event.sender,
"contains_url": (
- "url" in event.content
- and isinstance(event.content["url"], text_type)
+ "url" in event.content and isinstance(event.content["url"], str)
),
}
for event, _ in events_and_contexts
@@ -1345,10 +1341,10 @@ class PersistEventsStore:
):
if (
"min_lifetime" in event.content
- and not isinstance(event.content.get("min_lifetime"), integer_types)
+ and not isinstance(event.content.get("min_lifetime"), int)
) or (
"max_lifetime" in event.content
- and not isinstance(event.content.get("max_lifetime"), integer_types)
+ and not isinstance(event.content.get("max_lifetime"), int)
):
# Ignore the event if one of the value isn't an integer.
return
@@ -1497,11 +1493,11 @@ class PersistEventsStore:
table="event_to_state_groups",
values=[
{"state_group": state_group_id, "event_id": event_id}
- for event_id, state_group_id in iteritems(state_groups)
+ for event_id, state_group_id in state_groups.items()
],
)
- for event_id, state_group_id in iteritems(state_groups):
+ for event_id, state_group_id in state_groups.items():
txn.call_after(
self.store._get_state_group_for_event.prefill,
(event_id,),
diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py
index f54c8b1ee0..62d28f44dc 100644
--- a/synapse/storage/data_stores/main/events_bg_updates.py
+++ b/synapse/storage/data_stores/main/events_bg_updates.py
@@ -15,8 +15,6 @@
import logging
-from six import text_type
-
from canonicaljson import json
from twisted.internet import defer
@@ -133,7 +131,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
contains_url = "url" in content
if contains_url:
- contains_url &= isinstance(content["url"], text_type)
+ contains_url &= isinstance(content["url"], str)
except (KeyError, AttributeError):
# If the event is missing a necessary field then
# skip over it.
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index 213d69100a..a48c7a96ca 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -1077,9 +1077,32 @@ class EventsWorkerStore(SQLBaseStore):
"get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn
)
- def get_all_new_backfill_event_rows(self, last_id, current_id, limit):
+ async def get_all_new_backfill_event_rows(
+ self, instance_name: str, last_id: int, current_id: int, limit: int
+ ) -> Tuple[List[Tuple[int, list]], int, bool]:
+ """Get updates for backfill replication stream, including all new
+ backfilled events and events that have gone from being outliers to not.
+
+ Args:
+ instance_name: The writer we want to fetch updates from. Unused
+ here since there is only ever one writer.
+ last_id: The token to fetch updates from. Exclusive.
+ current_id: The token to fetch updates up to. Inclusive.
+ limit: The requested limit for the number of rows to return. The
+ function may return more or fewer rows.
+
+ Returns:
+ A tuple consisting of: the updates, a token to use to fetch
+ subsequent updates, and whether we returned fewer rows than exists
+ between the requested tokens due to the limit.
+
+ The token returned can be used in a subsequent call to this
+ function to get further updatees.
+
+ The updates are a list of 2-tuples of stream ID and the row data
+ """
if last_id == current_id:
- return defer.succeed([])
+ return [], current_id, False
def get_all_new_backfill_event_rows(txn):
sql = (
@@ -1094,10 +1117,12 @@ class EventsWorkerStore(SQLBaseStore):
" LIMIT ?"
)
txn.execute(sql, (-last_id, -current_id, limit))
- new_event_updates = txn.fetchall()
+ new_event_updates = [(row[0], row[1:]) for row in txn]
+ limited = False
if len(new_event_updates) == limit:
upper_bound = new_event_updates[-1][0]
+ limited = True
else:
upper_bound = current_id
@@ -1114,11 +1139,15 @@ class EventsWorkerStore(SQLBaseStore):
" ORDER BY event_stream_ordering DESC"
)
txn.execute(sql, (-last_id, -upper_bound))
- new_event_updates.extend(txn.fetchall())
+ new_event_updates.extend((row[0], row[1:]) for row in txn)
- return new_event_updates
+ if len(new_event_updates) >= limit:
+ upper_bound = new_event_updates[-1][0]
+ limited = True
- return self.db.runInteraction(
+ return new_event_updates, upper_bound, limited
+
+ return await self.db.runInteraction(
"get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
)
diff --git a/synapse/storage/data_stores/main/media_repository.py b/synapse/storage/data_stores/main/media_repository.py
index 8aecd414c2..15bc13cbd0 100644
--- a/synapse/storage/data_stores/main/media_repository.py
+++ b/synapse/storage/data_stores/main/media_repository.py
@@ -81,6 +81,15 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
desc="store_local_media",
)
+ def mark_local_media_as_safe(self, media_id: str):
+ """Mark a local media as safe from quarantining."""
+ return self.db.simple_update_one(
+ table="local_media_repository",
+ keyvalues={"media_id": media_id},
+ updatevalues={"safe_from_quarantine": True},
+ desc="mark_local_media_as_safe",
+ )
+
def get_url_cache(self, url, ts):
"""Get the media_id and ts for a cached URL as of the given timestamp
Returns:
diff --git a/synapse/storage/data_stores/main/presence.py b/synapse/storage/data_stores/main/presence.py
index dab31e0c2d..7574612619 100644
--- a/synapse/storage/data_stores/main/presence.py
+++ b/synapse/storage/data_stores/main/presence.py
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from typing import List, Tuple
+
from twisted.internet import defer
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
@@ -73,9 +75,32 @@ class PresenceStore(SQLBaseStore):
)
txn.execute(sql + clause, [stream_id] + list(args))
- def get_all_presence_updates(self, last_id, current_id, limit):
+ async def get_all_presence_updates(
+ self, instance_name: str, last_id: int, current_id: int, limit: int
+ ) -> Tuple[List[Tuple[int, list]], int, bool]:
+ """Get updates for presence replication stream.
+
+ Args:
+ instance_name: The writer we want to fetch updates from. Unused
+ here since there is only ever one writer.
+ last_id: The token to fetch updates from. Exclusive.
+ current_id: The token to fetch updates up to. Inclusive.
+ limit: The requested limit for the number of rows to return. The
+ function may return more or fewer rows.
+
+ Returns:
+ A tuple consisting of: the updates, a token to use to fetch
+ subsequent updates, and whether we returned fewer rows than exists
+ between the requested tokens due to the limit.
+
+ The token returned can be used in a subsequent call to this
+ function to get further updatees.
+
+ The updates are a list of 2-tuples of stream ID and the row data
+ """
+
if last_id == current_id:
- return defer.succeed([])
+ return [], current_id, False
def get_all_presence_updates_txn(txn):
sql = """
@@ -89,9 +114,17 @@ class PresenceStore(SQLBaseStore):
LIMIT ?
"""
txn.execute(sql, (last_id, current_id, limit))
- return txn.fetchall()
+ updates = [(row[0], row[1:]) for row in txn]
+
+ upper_bound = current_id
+ limited = False
+ if len(updates) >= limit:
+ upper_bound = updates[-1][0]
+ limited = True
+
+ return updates, upper_bound, limited
- return self.db.runInteraction(
+ return await self.db.runInteraction(
"get_all_presence_updates", get_all_presence_updates_txn
)
diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py
index ef8f40959f..f6e78ca590 100644
--- a/synapse/storage/data_stores/main/push_rule.py
+++ b/synapse/storage/data_stores/main/push_rule.py
@@ -16,7 +16,7 @@
import abc
import logging
-from typing import Union
+from typing import List, Tuple, Union
from canonicaljson import json
@@ -348,23 +348,53 @@ class PushRulesWorkerStore(
results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled
return results
- def get_all_push_rule_updates(self, last_id, current_id, limit):
- """Get all the push rules changes that have happend on the server"""
+ async def get_all_push_rule_updates(
+ self, instance_name: str, last_id: int, current_id: int, limit: int
+ ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+ """Get updates for push_rules replication stream.
+
+ Args:
+ instance_name: The writer we want to fetch updates from. Unused
+ here since there is only ever one writer.
+ last_id: The token to fetch updates from. Exclusive.
+ current_id: The token to fetch updates up to. Inclusive.
+ limit: The requested limit for the number of rows to return. The
+ function may return more or fewer rows.
+
+ Returns:
+ A tuple consisting of: the updates, a token to use to fetch
+ subsequent updates, and whether we returned fewer rows than exists
+ between the requested tokens due to the limit.
+
+ The token returned can be used in a subsequent call to this
+ function to get further updatees.
+
+ The updates are a list of 2-tuples of stream ID and the row data
+ """
+
if last_id == current_id:
- return defer.succeed([])
+ return [], current_id, False
def get_all_push_rule_updates_txn(txn):
- sql = (
- "SELECT stream_id, event_stream_ordering, user_id, rule_id,"
- " op, priority_class, priority, conditions, actions"
- " FROM push_rules_stream"
- " WHERE ? < stream_id AND stream_id <= ?"
- " ORDER BY stream_id ASC LIMIT ?"
- )
+ sql = """
+ SELECT stream_id, user_id
+ FROM push_rules_stream
+ WHERE ? < stream_id AND stream_id <= ?
+ ORDER BY stream_id ASC
+ LIMIT ?
+ """
txn.execute(sql, (last_id, current_id, limit))
- return txn.fetchall()
+ updates = [(stream_id, (user_id,)) for stream_id, user_id in txn]
+
+ limited = False
+ upper_bound = current_id
+ if len(updates) == limit:
+ limited = True
+ upper_bound = updates[-1][0]
+
+ return updates, upper_bound, limited
- return self.db.runInteraction(
+ return await self.db.runInteraction(
"get_all_push_rule_updates", get_all_push_rule_updates_txn
)
diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py
index cebdcd409f..8f5505bd67 100644
--- a/synapse/storage/data_stores/main/receipts.py
+++ b/synapse/storage/data_stores/main/receipts.py
@@ -16,6 +16,7 @@
import abc
import logging
+from typing import List, Tuple
from canonicaljson import json
@@ -24,6 +25,7 @@ from twisted.internet import defer
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage.database import Database
from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -266,26 +268,79 @@ class ReceiptsWorkerStore(SQLBaseStore):
}
return results
- def get_all_updated_receipts(self, last_id, current_id, limit=None):
+ def get_users_sent_receipts_between(self, last_id: int, current_id: int):
+ """Get all users who sent receipts between `last_id` exclusive and
+ `current_id` inclusive.
+
+ Returns:
+ Deferred[List[str]]
+ """
+
if last_id == current_id:
return defer.succeed([])
- def get_all_updated_receipts_txn(txn):
- sql = (
- "SELECT stream_id, room_id, receipt_type, user_id, event_id, data"
- " FROM receipts_linearized"
- " WHERE ? < stream_id AND stream_id <= ?"
- " ORDER BY stream_id ASC"
- )
- args = [last_id, current_id]
- if limit is not None:
- sql += " LIMIT ?"
- args.append(limit)
- txn.execute(sql, args)
+ def _get_users_sent_receipts_between_txn(txn):
+ sql = """
+ SELECT DISTINCT user_id FROM receipts_linearized
+ WHERE ? < stream_id AND stream_id <= ?
+ """
+ txn.execute(sql, (last_id, current_id))
- return [r[0:5] + (json.loads(r[5]),) for r in txn]
+ return [r[0] for r in txn]
return self.db.runInteraction(
+ "get_users_sent_receipts_between", _get_users_sent_receipts_between_txn
+ )
+
+ async def get_all_updated_receipts(
+ self, instance_name: str, last_id: int, current_id: int, limit: int
+ ) -> Tuple[List[Tuple[int, list]], int, bool]:
+ """Get updates for receipts replication stream.
+
+ Args:
+ instance_name: The writer we want to fetch updates from. Unused
+ here since there is only ever one writer.
+ last_id: The token to fetch updates from. Exclusive.
+ current_id: The token to fetch updates up to. Inclusive.
+ limit: The requested limit for the number of rows to return. The
+ function may return more or fewer rows.
+
+ Returns:
+ A tuple consisting of: the updates, a token to use to fetch
+ subsequent updates, and whether we returned fewer rows than exists
+ between the requested tokens due to the limit.
+
+ The token returned can be used in a subsequent call to this
+ function to get further updatees.
+
+ The updates are a list of 2-tuples of stream ID and the row data
+ """
+
+ if last_id == current_id:
+ return [], current_id, False
+
+ def get_all_updated_receipts_txn(txn):
+ sql = """
+ SELECT stream_id, room_id, receipt_type, user_id, event_id, data
+ FROM receipts_linearized
+ WHERE ? < stream_id AND stream_id <= ?
+ ORDER BY stream_id ASC
+ LIMIT ?
+ """
+ txn.execute(sql, (last_id, current_id, limit))
+
+ updates = [(r[0], r[1:5] + (json.loads(r[5]),)) for r in txn]
+
+ limited = False
+ upper_bound = current_id
+
+ if len(updates) == limit:
+ limited = True
+ upper_bound = updates[-1][0]
+
+ return updates, upper_bound, limited
+
+ return await self.db.runInteraction(
"get_all_updated_receipts", get_all_updated_receipts_txn
)
@@ -300,10 +355,10 @@ class ReceiptsWorkerStore(SQLBaseStore):
room_id, None, update_metrics=False
)
- # first handle the Deferred case
- if isinstance(res, defer.Deferred):
- if res.called:
- res = res.result
+ # first handle the ObservableDeferred case
+ if isinstance(res, ObservableDeferred):
+ if res.has_called():
+ res = res.get_result()
else:
res = None
diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py
index 9768981891..587d4b91c1 100644
--- a/synapse/storage/data_stores/main/registration.py
+++ b/synapse/storage/data_stores/main/registration.py
@@ -19,8 +19,6 @@ import logging
import re
from typing import Optional
-from six import iterkeys
-
from twisted.internet import defer
from twisted.internet.defer import Deferred
@@ -753,7 +751,7 @@ class RegistrationWorkerStore(SQLBaseStore):
last_send_attempt, validated_at
FROM threepid_validation_session WHERE %s
""" % (
- " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)),
+ " AND ".join("%s = ?" % k for k in keyvalues.keys()),
)
if validated is not None:
diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py
index 46f643c6b9..13e366536a 100644
--- a/synapse/storage/data_stores/main/room.py
+++ b/synapse/storage/data_stores/main/room.py
@@ -626,36 +626,10 @@ class RoomWorkerStore(SQLBaseStore):
def _quarantine_media_in_room_txn(txn):
local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id)
- total_media_quarantined = 0
-
- # Now update all the tables to set the quarantined_by flag
-
- txn.executemany(
- """
- UPDATE local_media_repository
- SET quarantined_by = ?
- WHERE media_id = ?
- """,
- ((quarantined_by, media_id) for media_id in local_mxcs),
- )
-
- txn.executemany(
- """
- UPDATE remote_media_cache
- SET quarantined_by = ?
- WHERE media_origin = ? AND media_id = ?
- """,
- (
- (quarantined_by, origin, media_id)
- for origin, media_id in remote_mxcs
- ),
+ return self._quarantine_media_txn(
+ txn, local_mxcs, remote_mxcs, quarantined_by
)
- total_media_quarantined += len(local_mxcs)
- total_media_quarantined += len(remote_mxcs)
-
- return total_media_quarantined
-
return self.db.runInteraction(
"quarantine_media_in_room", _quarantine_media_in_room_txn
)
@@ -805,17 +779,17 @@ class RoomWorkerStore(SQLBaseStore):
Returns:
The total number of media items quarantined
"""
- total_media_quarantined = 0
-
# Update all the tables to set the quarantined_by flag
txn.executemany(
"""
UPDATE local_media_repository
SET quarantined_by = ?
- WHERE media_id = ?
+ WHERE media_id = ? AND safe_from_quarantine = ?
""",
- ((quarantined_by, media_id) for media_id in local_mxcs),
+ ((quarantined_by, media_id, False) for media_id in local_mxcs),
)
+ # Note that a rowcount of -1 can be used to indicate no rows were affected.
+ total_media_quarantined = txn.rowcount if txn.rowcount > 0 else 0
txn.executemany(
"""
@@ -825,9 +799,7 @@ class RoomWorkerStore(SQLBaseStore):
""",
((quarantined_by, origin, media_id) for origin, media_id in remote_mxcs),
)
-
- total_media_quarantined += len(local_mxcs)
- total_media_quarantined += len(remote_mxcs)
+ total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0
return total_media_quarantined
diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py
index 137ebac833..44bab65eac 100644
--- a/synapse/storage/data_stores/main/roommember.py
+++ b/synapse/storage/data_stores/main/roommember.py
@@ -17,8 +17,6 @@
import logging
from typing import Iterable, List, Set
-from six import iteritems, itervalues
-
from canonicaljson import json
from twisted.internet import defer
@@ -544,7 +542,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
users_in_room = {}
member_event_ids = [
e_id
- for key, e_id in iteritems(current_state_ids)
+ for key, e_id in current_state_ids.items()
if key[0] == EventTypes.Member
]
@@ -561,7 +559,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
users_in_room = dict(prev_res)
member_event_ids = [
e_id
- for key, e_id in iteritems(context.delta_ids)
+ for key, e_id in context.delta_ids.items()
if key[0] == EventTypes.Member
]
for etype, state_key in context.delta_ids:
@@ -1101,7 +1099,7 @@ class _JoinedHostsCache(object):
if state_entry.state_group == self.state_group:
pass
elif state_entry.prev_group == self.state_group:
- for (typ, state_key), event_id in iteritems(state_entry.delta_ids):
+ for (typ, state_key), event_id in state_entry.delta_ids.items():
if typ != EventTypes.Member:
continue
@@ -1131,7 +1129,7 @@ class _JoinedHostsCache(object):
self.state_group = state_entry.state_group
else:
self.state_group = object()
- self._len = sum(len(v) for v in itervalues(self.hosts_to_joined_users))
+ self._len = sum(len(v) for v in self.hosts_to_joined_users.values())
return frozenset(self.hosts_to_joined_users)
def __len__(self):
diff --git a/synapse/storage/data_stores/main/schema/delta/30/as_users.py b/synapse/storage/data_stores/main/schema/delta/30/as_users.py
index 9b95411fb6..b42c02710a 100644
--- a/synapse/storage/data_stores/main/schema/delta/30/as_users.py
+++ b/synapse/storage/data_stores/main/schema/delta/30/as_users.py
@@ -13,8 +13,6 @@
# limitations under the License.
import logging
-from six.moves import range
-
from synapse.config.appservice import load_appservices
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.postgres b/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.postgres
new file mode 100644
index 0000000000..597f2ffd3d
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.postgres
@@ -0,0 +1,18 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- The local_media_repository should have files which do not get quarantined,
+-- e.g. files from sticker packs.
+ALTER TABLE local_media_repository ADD COLUMN safe_from_quarantine BOOLEAN NOT NULL DEFAULT FALSE;
diff --git a/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.sqlite b/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.sqlite
new file mode 100644
index 0000000000..69db89ac0e
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.sqlite
@@ -0,0 +1,18 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- The local_media_repository should have files which do not get quarantined,
+-- e.g. files from sticker packs.
+ALTER TABLE local_media_repository ADD COLUMN safe_from_quarantine BOOLEAN NOT NULL DEFAULT 0;
diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py
index 13f49d8060..a8381dc577 100644
--- a/synapse/storage/data_stores/main/search.py
+++ b/synapse/storage/data_stores/main/search.py
@@ -17,8 +17,6 @@ import logging
import re
from collections import namedtuple
-from six import string_types
-
from canonicaljson import json
from twisted.internet import defer
@@ -180,7 +178,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
# skip over it.
continue
- if not isinstance(value, string_types):
+ if not isinstance(value, str):
# If the event body, name or topic isn't a string
# then skip over it
continue
diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py
index e89f0bffb5..379d758b5d 100644
--- a/synapse/storage/data_stores/main/stream.py
+++ b/synapse/storage/data_stores/main/stream.py
@@ -40,8 +40,6 @@ import abc
import logging
from collections import namedtuple
-from six.moves import range
-
from twisted.internet import defer
from synapse.logging.context import make_deferred_yieldable, run_in_background
diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py
index 4219018302..f8c776be3f 100644
--- a/synapse/storage/data_stores/main/tags.py
+++ b/synapse/storage/data_stores/main/tags.py
@@ -16,8 +16,6 @@
import logging
-from six.moves import range
-
from canonicaljson import json
from twisted.internet import defer
diff --git a/synapse/storage/data_stores/main/ui_auth.py b/synapse/storage/data_stores/main/ui_auth.py
index 1d8ee22fb1..ec2f38c373 100644
--- a/synapse/storage/data_stores/main/ui_auth.py
+++ b/synapse/storage/data_stores/main/ui_auth.py
@@ -186,7 +186,7 @@ class UIAuthWorkerStore(SQLBaseStore):
# The clientdict gets stored as JSON.
clientdict_json = json.dumps(clientdict)
- self.db.simple_update_one(
+ await self.db.simple_update_one(
table="ui_auth_sessions",
keyvalues={"session_id": session_id},
updatevalues={"clientdict": clientdict_json},
diff --git a/synapse/storage/data_stores/state/bg_updates.py b/synapse/storage/data_stores/state/bg_updates.py
index ff000bc9ec..be1fe97d79 100644
--- a/synapse/storage/data_stores/state/bg_updates.py
+++ b/synapse/storage/data_stores/state/bg_updates.py
@@ -15,8 +15,6 @@
import logging
-from six import iteritems
-
from twisted.internet import defer
from synapse.storage._base import SQLBaseStore
@@ -280,7 +278,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
delta_state = {
key: value
- for key, value in iteritems(curr_state)
+ for key, value in curr_state.items()
if prev_state.get(key, None) != value
}
@@ -316,7 +314,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
"state_key": key[1],
"event_id": state_id,
}
- for key, state_id in iteritems(delta_state)
+ for key, state_id in delta_state.items()
],
)
diff --git a/synapse/storage/data_stores/state/store.py b/synapse/storage/data_stores/state/store.py
index f3ad1e4369..5db9f20135 100644
--- a/synapse/storage/data_stores/state/store.py
+++ b/synapse/storage/data_stores/state/store.py
@@ -17,9 +17,6 @@ import logging
from collections import namedtuple
from typing import Dict, Iterable, List, Set, Tuple
-from six import iteritems
-from six.moves import range
-
from twisted.internet import defer
from synapse.api.constants import EventTypes
@@ -263,7 +260,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
# And finally update the result dict, by filtering out any extra
# stuff we pulled out of the database.
- for group, group_state_dict in iteritems(group_to_state_dict):
+ for group, group_state_dict in group_to_state_dict.items():
# We just replace any existing entries, as we will have loaded
# everything we need from the database anyway.
state[group] = state_filter.filter_state(group_state_dict)
@@ -341,11 +338,11 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
else:
non_member_types = non_member_filter.concrete_types()
- for group, group_state_dict in iteritems(group_to_state_dict):
+ for group, group_state_dict in group_to_state_dict.items():
state_dict_members = {}
state_dict_non_members = {}
- for k, v in iteritems(group_state_dict):
+ for k, v in group_state_dict.items():
if k[0] == EventTypes.Member:
state_dict_members[k] = v
else:
@@ -432,7 +429,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
"state_key": key[1],
"event_id": state_id,
}
- for key, state_id in iteritems(delta_ids)
+ for key, state_id in delta_ids.items()
],
)
else:
@@ -447,7 +444,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
"state_key": key[1],
"event_id": state_id,
}
- for key, state_id in iteritems(current_state_ids)
+ for key, state_id in current_state_ids.items()
],
)
@@ -458,7 +455,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
current_member_state_ids = {
s: ev
- for (s, ev) in iteritems(current_state_ids)
+ for (s, ev) in current_state_ids.items()
if s[0] == EventTypes.Member
}
txn.call_after(
@@ -470,7 +467,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
current_non_member_state_ids = {
s: ev
- for (s, ev) in iteritems(current_state_ids)
+ for (s, ev) in current_state_ids.items()
if s[0] != EventTypes.Member
}
txn.call_after(
@@ -555,7 +552,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
"state_key": key[1],
"event_id": state_id,
}
- for key, state_id in iteritems(curr_state)
+ for key, state_id in curr_state.items()
],
)
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index b112ff3df2..3be20c866a 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -16,6 +16,7 @@
# limitations under the License.
import logging
import time
+from sys import intern
from time import monotonic as monotonic_time
from typing import (
Any,
@@ -29,9 +30,6 @@ from typing import (
TypeVar,
)
-from six import iteritems, iterkeys, itervalues
-from six.moves import intern, range
-
from prometheus_client import Histogram
from twisted.enterprise import adbapi
@@ -259,7 +257,7 @@ class PerformanceCounters(object):
def interval(self, interval_duration_secs, limit=3):
counters = []
- for name, (count, cum_time) in iteritems(self.current_counters):
+ for name, (count, cum_time) in self.current_counters.items():
prev_count, prev_time = self.previous_counters.get(name, (0, 0))
counters.append(
(
@@ -1053,7 +1051,7 @@ class Database(object):
sql = ("SELECT %(retcol)s FROM %(table)s") % {"retcol": retcol, "table": table}
if keyvalues:
- sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues))
+ sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
txn.execute(sql, list(keyvalues.values()))
else:
txn.execute(sql)
@@ -1191,7 +1189,7 @@ class Database(object):
clause, values = make_in_list_sql_clause(txn.database_engine, column, iterable)
clauses = [clause]
- for key, value in iteritems(keyvalues):
+ for key, value in keyvalues.items():
clauses.append("%s = ?" % (key,))
values.append(value)
@@ -1212,7 +1210,7 @@ class Database(object):
@staticmethod
def simple_update_txn(txn, table, keyvalues, updatevalues):
if keyvalues:
- where = "WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues))
+ where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
else:
where = ""
@@ -1351,7 +1349,7 @@ class Database(object):
clause, values = make_in_list_sql_clause(txn.database_engine, column, iterable)
clauses = [clause]
- for key, value in iteritems(keyvalues):
+ for key, value in keyvalues.items():
clauses.append("%s = ?" % (key,))
values.append(value)
@@ -1388,7 +1386,7 @@ class Database(object):
txn.close()
if cache:
- min_val = min(itervalues(cache))
+ min_val = min(cache.values())
else:
min_val = max_value
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index f159400a87..ec894a91cb 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -20,9 +20,6 @@ import logging
from collections import deque, namedtuple
from typing import Iterable, List, Optional, Set, Tuple
-from six import iteritems
-from six.moves import range
-
from prometheus_client import Counter, Histogram
from twisted.internet import defer
@@ -218,7 +215,7 @@ class EventsPersistenceStorage(object):
partitioned.setdefault(event.room_id, []).append((event, ctx))
deferreds = []
- for room_id, evs_ctxs in iteritems(partitioned):
+ for room_id, evs_ctxs in partitioned.items():
d = self._event_persist_queue.add_to_queue(
room_id, evs_ctxs, backfilled=backfilled
)
@@ -319,7 +316,7 @@ class EventsPersistenceStorage(object):
(event, context)
)
- for room_id, ev_ctx_rm in iteritems(events_by_room):
+ for room_id, ev_ctx_rm in events_by_room.items():
latest_event_ids = await self.main_store.get_latest_event_ids_in_room(
room_id
)
@@ -674,7 +671,7 @@ class EventsPersistenceStorage(object):
to_insert = {
key: ev_id
- for key, ev_id in iteritems(current_state)
+ for key, ev_id in current_state.items()
if ev_id != existing_state.get(key)
}
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index c522c80922..dc568476f4 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -16,8 +16,6 @@
import logging
from typing import Iterable, List, TypeVar
-from six import iteritems, itervalues
-
import attr
from twisted.internet import defer
@@ -51,7 +49,7 @@ class StateFilter(object):
# If `include_others` is set we canonicalise the filter by removing
# wildcards from the types dictionary
if self.include_others:
- self.types = {k: v for k, v in iteritems(self.types) if v is not None}
+ self.types = {k: v for k, v in self.types.items() if v is not None}
@staticmethod
def all():
@@ -150,7 +148,7 @@ class StateFilter(object):
has_non_member_wildcard = self.include_others or any(
state_keys is None
- for t, state_keys in iteritems(self.types)
+ for t, state_keys in self.types.items()
if t != EventTypes.Member
)
@@ -199,7 +197,7 @@ class StateFilter(object):
# First we build up a lost of clauses for each type/state_key combo
clauses = []
- for etype, state_keys in iteritems(self.types):
+ for etype, state_keys in self.types.items():
if state_keys is None:
clauses.append("(type = ?)")
where_args.append(etype)
@@ -251,7 +249,7 @@ class StateFilter(object):
return dict(state_dict)
filtered_state = {}
- for k, v in iteritems(state_dict):
+ for k, v in state_dict.items():
typ, state_key = k
if typ in self.types:
state_keys = self.types[typ]
@@ -279,7 +277,7 @@ class StateFilter(object):
"""
return self.include_others or any(
- state_keys is None for state_keys in itervalues(self.types)
+ state_keys is None for state_keys in self.types.values()
)
def concrete_types(self):
@@ -292,7 +290,7 @@ class StateFilter(object):
"""
return [
(t, s)
- for t, state_keys in iteritems(self.types)
+ for t, state_keys in self.types.items()
if state_keys is not None
for s in state_keys
]
@@ -324,7 +322,7 @@ class StateFilter(object):
member_filter = StateFilter.none()
non_member_filter = StateFilter(
- types={k: v for k, v in iteritems(self.types) if k != EventTypes.Member},
+ types={k: v for k, v in self.types.items() if k != EventTypes.Member},
include_others=self.include_others,
)
@@ -366,7 +364,7 @@ class StateGroupStorage(object):
event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids)
- groups = set(itervalues(event_to_groups))
+ groups = set(event_to_groups.values())
group_to_state = yield self.stores.state._get_state_for_groups(groups)
return group_to_state
@@ -400,8 +398,8 @@ class StateGroupStorage(object):
state_event_map = yield self.stores.main.get_events(
[
ev_id
- for group_ids in itervalues(group_to_ids)
- for ev_id in itervalues(group_ids)
+ for group_ids in group_to_ids.values()
+ for ev_id in group_ids.values()
],
get_prev_content=False,
)
@@ -409,10 +407,10 @@ class StateGroupStorage(object):
return {
group: [
state_event_map[v]
- for v in itervalues(event_id_map)
+ for v in event_id_map.values()
if v in state_event_map
]
- for group, event_id_map in iteritems(group_to_ids)
+ for group, event_id_map in group_to_ids.items()
}
def _get_state_groups_from_groups(
@@ -444,23 +442,23 @@ class StateGroupStorage(object):
"""
event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids)
- groups = set(itervalues(event_to_groups))
+ groups = set(event_to_groups.values())
group_to_state = yield self.stores.state._get_state_for_groups(
groups, state_filter
)
state_event_map = yield self.stores.main.get_events(
- [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)],
+ [ev_id for sd in group_to_state.values() for ev_id in sd.values()],
get_prev_content=False,
)
event_to_state = {
event_id: {
k: state_event_map[v]
- for k, v in iteritems(group_to_state[group])
+ for k, v in group_to_state[group].items()
if v in state_event_map
}
- for event_id, group in iteritems(event_to_groups)
+ for event_id, group in event_to_groups.items()
}
return {event: event_to_state[event] for event in event_ids}
@@ -481,14 +479,14 @@ class StateGroupStorage(object):
"""
event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids)
- groups = set(itervalues(event_to_groups))
+ groups = set(event_to_groups.values())
group_to_state = yield self.stores.state._get_state_for_groups(
groups, state_filter
)
event_to_state = {
event_id: group_to_state[group]
- for event_id, group in iteritems(event_to_groups)
+ for event_id, group in event_to_groups.items()
}
return {event: event_to_state[event] for event in event_ids}
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index f7af2bca7f..65abf0846e 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -19,8 +19,6 @@ import logging
from contextlib import contextmanager
from typing import Dict, Sequence, Set, Union
-from six.moves import range
-
import attr
from twisted.internet import defer
@@ -95,7 +93,7 @@ class ObservableDeferred(object):
This returns a brand new deferred that is resolved when the underlying
deferred is resolved. Interacting with the returned deferred does not
- effect the underdlying deferred.
+ effect the underlying deferred.
"""
if not self._result:
d = defer.Deferred()
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index cd48262420..64f35fc288 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -21,8 +21,6 @@ import threading
from typing import Any, Tuple, Union, cast
from weakref import WeakValueDictionary
-from six import itervalues
-
from prometheus_client import Gauge
from typing_extensions import Protocol
@@ -281,7 +279,7 @@ class Cache(object):
def invalidate_all(self):
self.check_thread()
self.cache.clear()
- for entry in itervalues(self._pending_deferred_cache):
+ for entry in self._pending_deferred_cache.values():
entry.invalidate()
self._pending_deferred_cache.clear()
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 2726b67b6d..89a3420f92 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -16,8 +16,6 @@
import logging
from collections import OrderedDict
-from six import iteritems, itervalues
-
from synapse.config import cache as cache_config
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import register_cache
@@ -150,7 +148,7 @@ class ExpiringCache(object):
keys_to_delete = set()
- for key, cache_entry in iteritems(self._cache):
+ for key, cache_entry in self._cache.items():
if now - cache_entry.time > self._expiry_ms:
keys_to_delete.add(key)
@@ -170,7 +168,7 @@ class ExpiringCache(object):
def __len__(self):
if self.iterable:
- return sum(len(entry.value) for entry in itervalues(self._cache))
+ return sum(len(entry.value) for entry in self._cache.values())
else:
return len(self._cache)
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 2a161bf244..c541bf4579 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -17,8 +17,6 @@ import logging
import math
from typing import Dict, FrozenSet, List, Mapping, Optional, Set, Union
-from six import integer_types
-
from sortedcontainers import SortedDict
from synapse.types import Collection
@@ -88,7 +86,7 @@ class StreamChangeCache:
def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool:
"""Returns True if the entity may have been updated since stream_pos
"""
- assert type(stream_pos) in integer_types
+ assert isinstance(stream_pos, int)
if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py
index 2ea4e4e911..ecd9948e79 100644
--- a/synapse/util/caches/treecache.py
+++ b/synapse/util/caches/treecache.py
@@ -1,7 +1,5 @@
from typing import Dict
-from six import itervalues
-
SENTINEL = object()
@@ -81,7 +79,7 @@ def iterate_tree_cache_entry(d):
can contain dicts.
"""
if isinstance(d, dict):
- for value_d in itervalues(d):
+ for value_d in d.values():
for value in iterate_tree_cache_entry(value_d):
yield value
else:
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
index 8b17d1c8b8..6a3f6177b1 100644
--- a/synapse/util/file_consumer.py
+++ b/synapse/util/file_consumer.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from six.moves import queue
+import queue
from twisted.internet import threads
diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py
index 9815bb8667..eab78dd256 100644
--- a/synapse/util/frozenutils.py
+++ b/synapse/util/frozenutils.py
@@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from six import binary_type, text_type
-
from canonicaljson import json
from frozendict import frozendict
@@ -26,7 +24,7 @@ def freeze(o):
if isinstance(o, frozendict):
return o
- if isinstance(o, (binary_type, text_type)):
+ if isinstance(o, (bytes, str)):
return o
try:
@@ -41,7 +39,7 @@ def unfreeze(o):
if isinstance(o, (dict, frozendict)):
return dict({k: unfreeze(v) for k, v in o.items()})
- if isinstance(o, (binary_type, text_type)):
+ if isinstance(o, (bytes, str)):
return o
try:
diff --git a/synapse/util/wheel_timer.py b/synapse/util/wheel_timer.py
index 9bf6a44f75..023beb5ede 100644
--- a/synapse/util/wheel_timer.py
+++ b/synapse/util/wheel_timer.py
@@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from six.moves import range
-
class _Entry(object):
__slots__ = ["end_key", "queue"]
diff --git a/synapse/visibility.py b/synapse/visibility.py
index bab41182b9..3dfd4af26c 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -16,9 +16,6 @@
import logging
import operator
-from six import iteritems, itervalues
-from six.moves import map
-
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
@@ -298,7 +295,7 @@ def filter_events_for_server(
# membership states for the requesting server to determine
# if the server is either in the room or has been invited
# into the room.
- for ev in itervalues(state):
+ for ev in state.values():
if ev.type != EventTypes.Member:
continue
try:
@@ -332,7 +329,7 @@ def filter_events_for_server(
)
visibility_ids = set()
- for sids in itervalues(event_to_state_ids):
+ for sids in event_to_state_ids.values():
hist = sids.get((EventTypes.RoomHistoryVisibility, ""))
if hist:
visibility_ids.add(hist)
@@ -345,7 +342,7 @@ def filter_events_for_server(
event_map = yield storage.main.get_events(visibility_ids)
all_open = all(
e.content.get("history_visibility") in (None, "shared", "world_readable")
- for e in itervalues(event_map)
+ for e in event_map.values()
)
if not check_history_visibility_only:
@@ -394,8 +391,8 @@ def filter_events_for_server(
#
event_id_to_state_key = {
event_id: key
- for key_to_eid in itervalues(event_to_state_ids)
- for key, event_id in iteritems(key_to_eid)
+ for key_to_eid in event_to_state_ids.values()
+ for key, event_id in key_to_eid.items()
}
def include(typ, state_key):
@@ -409,20 +406,16 @@ def filter_events_for_server(
return state_key[idx + 1 :] == server_name
event_map = yield storage.main.get_events(
- [
- e_id
- for e_id, key in iteritems(event_id_to_state_key)
- if include(key[0], key[1])
- ]
+ [e_id for e_id, key in event_id_to_state_key.items() if include(key[0], key[1])]
)
event_to_state = {
e_id: {
key: event_map[inner_e_id]
- for key, inner_e_id in iteritems(key_to_eid)
+ for key, inner_e_id in key_to_eid.items()
if inner_e_id in event_map
}
- for e_id, key_to_eid in iteritems(event_to_state_ids)
+ for e_id, key_to_eid in event_to_state_ids.items()
}
to_return = []
|