diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 8f37d2cf3b..6856dab06c 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -59,6 +59,8 @@ class JoinRules:
KNOCK = "knock"
INVITE = "invite"
PRIVATE = "private"
+ # As defined for MSC3083.
+ MSC3083_RESTRICTED = "restricted"
class LoginType:
diff --git a/synapse/api/ratelimiting.py b/synapse/api/ratelimiting.py
index c3f07bc1a3..2244b8a340 100644
--- a/synapse/api/ratelimiting.py
+++ b/synapse/api/ratelimiting.py
@@ -17,6 +17,7 @@ from collections import OrderedDict
from typing import Hashable, Optional, Tuple
from synapse.api.errors import LimitExceededError
+from synapse.storage.databases.main import DataStore
from synapse.types import Requester
from synapse.util import Clock
@@ -31,10 +32,13 @@ class Ratelimiter:
burst_count: How many actions that can be performed before being limited.
"""
- def __init__(self, clock: Clock, rate_hz: float, burst_count: int):
+ def __init__(
+ self, store: DataStore, clock: Clock, rate_hz: float, burst_count: int
+ ):
self.clock = clock
self.rate_hz = rate_hz
self.burst_count = burst_count
+ self.store = store
# A ordered dictionary keeping track of actions, when they were last
# performed and how often. Each entry is a mapping from a key of arbitrary type
@@ -46,45 +50,10 @@ class Ratelimiter:
OrderedDict()
) # type: OrderedDict[Hashable, Tuple[float, int, float]]
- def can_requester_do_action(
- self,
- requester: Requester,
- rate_hz: Optional[float] = None,
- burst_count: Optional[int] = None,
- update: bool = True,
- _time_now_s: Optional[int] = None,
- ) -> Tuple[bool, float]:
- """Can the requester perform the action?
-
- Args:
- requester: The requester to key off when rate limiting. The user property
- will be used.
- rate_hz: The long term number of actions that can be performed in a second.
- Overrides the value set during instantiation if set.
- burst_count: How many actions that can be performed before being limited.
- Overrides the value set during instantiation if set.
- update: Whether to count this check as performing the action
- _time_now_s: The current time. Optional, defaults to the current time according
- to self.clock. Only used by tests.
-
- Returns:
- A tuple containing:
- * A bool indicating if they can perform the action now
- * The reactor timestamp for when the action can be performed next.
- -1 if rate_hz is less than or equal to zero
- """
- # Disable rate limiting of users belonging to any AS that is configured
- # not to be rate limited in its registration file (rate_limited: true|false).
- if requester.app_service and not requester.app_service.is_rate_limited():
- return True, -1.0
-
- return self.can_do_action(
- requester.user.to_string(), rate_hz, burst_count, update, _time_now_s
- )
-
- def can_do_action(
+ async def can_do_action(
self,
- key: Hashable,
+ requester: Optional[Requester],
+ key: Optional[Hashable] = None,
rate_hz: Optional[float] = None,
burst_count: Optional[int] = None,
update: bool = True,
@@ -92,9 +61,16 @@ class Ratelimiter:
) -> Tuple[bool, float]:
"""Can the entity (e.g. user or IP address) perform the action?
+ Checks if the user has ratelimiting disabled in the database by looking
+ for null/zero values in the `ratelimit_override` table. (Non-zero
+ values aren't honoured, as they're specific to the event sending
+ ratelimiter, rather than all ratelimiters)
+
Args:
- key: The key we should use when rate limiting. Can be a user ID
- (when sending events), an IP address, etc.
+ requester: The requester that is doing the action, if any. Used to check
+ if the user has ratelimits disabled in the database.
+ key: An arbitrary key used to classify an action. Defaults to the
+ requester's user ID.
rate_hz: The long term number of actions that can be performed in a second.
Overrides the value set during instantiation if set.
burst_count: How many actions that can be performed before being limited.
@@ -109,6 +85,30 @@ class Ratelimiter:
* The reactor timestamp for when the action can be performed next.
-1 if rate_hz is less than or equal to zero
"""
+ if key is None:
+ if not requester:
+ raise ValueError("Must supply at least one of `requester` or `key`")
+
+ key = requester.user.to_string()
+
+ if requester:
+ # Disable rate limiting of users belonging to any AS that is configured
+ # not to be rate limited in its registration file (rate_limited: true|false).
+ if requester.app_service and not requester.app_service.is_rate_limited():
+ return True, -1.0
+
+ # Check if ratelimiting has been disabled for the user.
+ #
+ # Note that we don't use the returned rate/burst count, as the table
+ # is specifically for the event sending ratelimiter. Instead, we
+ # only use it to (somewhat cheekily) infer whether the user should
+ # be subject to any rate limiting or not.
+ override = await self.store.get_ratelimit_for_user(
+ requester.authenticated_entity
+ )
+ if override and not override.messages_per_second:
+ return True, -1.0
+
# Override default values if set
time_now_s = _time_now_s if _time_now_s is not None else self.clock.time()
rate_hz = rate_hz if rate_hz is not None else self.rate_hz
@@ -175,9 +175,10 @@ class Ratelimiter:
else:
del self.actions[key]
- def ratelimit(
+ async def ratelimit(
self,
- key: Hashable,
+ requester: Optional[Requester],
+ key: Optional[Hashable] = None,
rate_hz: Optional[float] = None,
burst_count: Optional[int] = None,
update: bool = True,
@@ -185,8 +186,16 @@ class Ratelimiter:
):
"""Checks if an action can be performed. If not, raises a LimitExceededError
+ Checks if the user has ratelimiting disabled in the database by looking
+ for null/zero values in the `ratelimit_override` table. (Non-zero
+ values aren't honoured, as they're specific to the event sending
+ ratelimiter, rather than all ratelimiters)
+
Args:
- key: An arbitrary key used to classify an action
+ requester: The requester that is doing the action, if any. Used to check for
+ if the user has ratelimits disabled.
+ key: An arbitrary key used to classify an action. Defaults to the
+ requester's user ID.
rate_hz: The long term number of actions that can be performed in a second.
Overrides the value set during instantiation if set.
burst_count: How many actions that can be performed before being limited.
@@ -201,7 +210,8 @@ class Ratelimiter:
"""
time_now_s = _time_now_s if _time_now_s is not None else self.clock.time()
- allowed, time_allowed = self.can_do_action(
+ allowed, time_allowed = await self.can_do_action(
+ requester,
key,
rate_hz=rate_hz,
burst_count=burst_count,
diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py
index de2cc15d33..87038d436d 100644
--- a/synapse/api/room_versions.py
+++ b/synapse/api/room_versions.py
@@ -57,7 +57,7 @@ class RoomVersion:
state_res = attr.ib(type=int) # one of the StateResolutionVersions
enforce_key_validity = attr.ib(type=bool)
- # bool: before MSC2261/MSC2432, m.room.aliases had special auth rules and redaction rules
+ # Before MSC2261/MSC2432, m.room.aliases had special auth rules and redaction rules
special_case_aliases_auth = attr.ib(type=bool)
# Strictly enforce canonicaljson, do not allow:
# * Integers outside the range of [-2 ^ 53 + 1, 2 ^ 53 - 1]
@@ -69,6 +69,8 @@ class RoomVersion:
limit_notifications_power_levels = attr.ib(type=bool)
# MSC2174/MSC2176: Apply updated redaction rules algorithm.
msc2176_redaction_rules = attr.ib(type=bool)
+ # MSC3083: Support the 'restricted' join_rule.
+ msc3083_join_rules = attr.ib(type=bool)
class RoomVersions:
@@ -82,6 +84,7 @@ class RoomVersions:
strict_canonicaljson=False,
limit_notifications_power_levels=False,
msc2176_redaction_rules=False,
+ msc3083_join_rules=False,
)
V2 = RoomVersion(
"2",
@@ -93,6 +96,7 @@ class RoomVersions:
strict_canonicaljson=False,
limit_notifications_power_levels=False,
msc2176_redaction_rules=False,
+ msc3083_join_rules=False,
)
V3 = RoomVersion(
"3",
@@ -104,6 +108,7 @@ class RoomVersions:
strict_canonicaljson=False,
limit_notifications_power_levels=False,
msc2176_redaction_rules=False,
+ msc3083_join_rules=False,
)
V4 = RoomVersion(
"4",
@@ -115,6 +120,7 @@ class RoomVersions:
strict_canonicaljson=False,
limit_notifications_power_levels=False,
msc2176_redaction_rules=False,
+ msc3083_join_rules=False,
)
V5 = RoomVersion(
"5",
@@ -126,6 +132,7 @@ class RoomVersions:
strict_canonicaljson=False,
limit_notifications_power_levels=False,
msc2176_redaction_rules=False,
+ msc3083_join_rules=False,
)
V6 = RoomVersion(
"6",
@@ -137,6 +144,7 @@ class RoomVersions:
strict_canonicaljson=True,
limit_notifications_power_levels=True,
msc2176_redaction_rules=False,
+ msc3083_join_rules=False,
)
MSC2176 = RoomVersion(
"org.matrix.msc2176",
@@ -148,6 +156,19 @@ class RoomVersions:
strict_canonicaljson=True,
limit_notifications_power_levels=True,
msc2176_redaction_rules=True,
+ msc3083_join_rules=False,
+ )
+ MSC3083 = RoomVersion(
+ "org.matrix.msc3083",
+ RoomDisposition.UNSTABLE,
+ EventFormatVersions.V3,
+ StateResolutionVersions.V2,
+ enforce_key_validity=True,
+ special_case_aliases_auth=False,
+ strict_canonicaljson=True,
+ limit_notifications_power_levels=True,
+ msc2176_redaction_rules=False,
+ msc3083_join_rules=True,
)
@@ -162,4 +183,5 @@ KNOWN_ROOM_VERSIONS = {
RoomVersions.V6,
RoomVersions.MSC2176,
)
+ # Note that we do not include MSC3083 here unless it is enabled in the config.
} # type: Dict[str, RoomVersion]
diff --git a/synapse/config/api.py b/synapse/config/api.py
index 74cd53a8ed..55c038c0c4 100644
--- a/synapse/config/api.py
+++ b/synapse/config/api.py
@@ -1,4 +1,4 @@
-# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2015-2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,38 +12,131 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+from typing import Iterable
+
from synapse.api.constants import EventTypes
+from synapse.config._base import Config, ConfigError
+from synapse.config._util import validate_config
+from synapse.types import JsonDict
-from ._base import Config
+logger = logging.getLogger(__name__)
class ApiConfig(Config):
section = "api"
- def read_config(self, config, **kwargs):
- self.room_invite_state_types = config.get(
- "room_invite_state_types",
- [
- EventTypes.JoinRules,
- EventTypes.CanonicalAlias,
- EventTypes.RoomAvatar,
- EventTypes.RoomEncryption,
- EventTypes.Name,
- ],
+ def read_config(self, config: JsonDict, **kwargs):
+ validate_config(_MAIN_SCHEMA, config, ())
+ self.room_prejoin_state = list(self._get_prejoin_state_types(config))
+
+ def generate_config_section(cls, **kwargs) -> str:
+ formatted_default_state_types = "\n".join(
+ " # - %s" % (t,) for t in _DEFAULT_PREJOIN_STATE_TYPES
)
- def generate_config_section(cls, **kwargs):
return """\
## API Configuration ##
- # A list of event types that will be included in the room_invite_state
+ # Controls for the state that is shared with users who receive an invite
+ # to a room
#
- #room_invite_state_types:
- # - "{JoinRules}"
- # - "{CanonicalAlias}"
- # - "{RoomAvatar}"
- # - "{RoomEncryption}"
- # - "{Name}"
- """.format(
- **vars(EventTypes)
- )
+ room_prejoin_state:
+ # By default, the following state event types are shared with users who
+ # receive invites to the room:
+ #
+%(formatted_default_state_types)s
+ #
+ # Uncomment the following to disable these defaults (so that only the event
+ # types listed in 'additional_event_types' are shared). Defaults to 'false'.
+ #
+ #disable_default_event_types: true
+
+ # Additional state event types to share with users when they are invited
+ # to a room.
+ #
+ # By default, this list is empty (so only the default event types are shared).
+ #
+ #additional_event_types:
+ # - org.example.custom.event.type
+ """ % {
+ "formatted_default_state_types": formatted_default_state_types
+ }
+
+ def _get_prejoin_state_types(self, config: JsonDict) -> Iterable[str]:
+ """Get the event types to include in the prejoin state
+
+ Parses the config and returns an iterable of the event types to be included.
+ """
+ room_prejoin_state_config = config.get("room_prejoin_state") or {}
+
+ # backwards-compatibility support for room_invite_state_types
+ if "room_invite_state_types" in config:
+ # if both "room_invite_state_types" and "room_prejoin_state" are set, then
+ # we don't really know what to do.
+ if room_prejoin_state_config:
+ raise ConfigError(
+ "Can't specify both 'room_invite_state_types' and 'room_prejoin_state' "
+ "in config"
+ )
+
+ logger.warning(_ROOM_INVITE_STATE_TYPES_WARNING)
+
+ yield from config["room_invite_state_types"]
+ return
+
+ if not room_prejoin_state_config.get("disable_default_event_types"):
+ yield from _DEFAULT_PREJOIN_STATE_TYPES
+
+ if self.spaces_enabled:
+ # MSC1772 suggests adding m.room.create to the prejoin state
+ yield EventTypes.Create
+
+ yield from room_prejoin_state_config.get("additional_event_types", [])
+
+
+_ROOM_INVITE_STATE_TYPES_WARNING = """\
+WARNING: The 'room_invite_state_types' configuration setting is now deprecated,
+and replaced with 'room_prejoin_state'. New features may not work correctly
+unless 'room_invite_state_types' is removed. See the sample configuration file for
+details of 'room_prejoin_state'.
+--------------------------------------------------------------------------------
+"""
+
+_DEFAULT_PREJOIN_STATE_TYPES = [
+ EventTypes.JoinRules,
+ EventTypes.CanonicalAlias,
+ EventTypes.RoomAvatar,
+ EventTypes.RoomEncryption,
+ EventTypes.Name,
+]
+
+
+# room_prejoin_state can either be None (as it is in the default config), or
+# an object containing other config settings
+_ROOM_PREJOIN_STATE_CONFIG_SCHEMA = {
+ "oneOf": [
+ {
+ "type": "object",
+ "properties": {
+ "disable_default_event_types": {"type": "boolean"},
+ "additional_event_types": {
+ "type": "array",
+ "items": {"type": "string"},
+ },
+ },
+ },
+ {"type": "null"},
+ ]
+}
+
+# the legacy room_invite_state_types setting
+_ROOM_INVITE_STATE_TYPES_SCHEMA = {"type": "array", "items": {"type": "string"}}
+
+_MAIN_SCHEMA = {
+ "type": "object",
+ "properties": {
+ "room_prejoin_state": _ROOM_PREJOIN_STATE_CONFIG_SCHEMA,
+ "room_invite_state_types": _ROOM_INVITE_STATE_TYPES_SCHEMA,
+ },
+}
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index 86f4d9af9d..eb96ecda74 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
from synapse.config._base import Config
from synapse.types import JsonDict
@@ -27,7 +28,11 @@ class ExperimentalConfig(Config):
# MSC2858 (multiple SSO identity providers)
self.msc2858_enabled = experimental.get("msc2858_enabled", False) # type: bool
- # Spaces (MSC1772, MSC2946, etc)
+
+ # Spaces (MSC1772, MSC2946, MSC3083, etc)
self.spaces_enabled = experimental.get("spaces_enabled", False) # type: bool
+ if self.spaces_enabled:
+ KNOWN_ROOM_VERSIONS[RoomVersions.MSC3083.identifier] = RoomVersions.MSC3083
+
# MSC3026 (busy presence state)
self.msc3026_enabled = experimental.get("msc3026_enabled", False) # type: bool
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index ead007ba5a..f27d1e14ac 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -298,9 +298,9 @@ class RegistrationConfig(Config):
#
#allowed_local_3pids:
# - medium: email
- # pattern: '.*@matrix\\.org'
+ # pattern: '^[^@]+@matrix\\.org$'
# - medium: email
- # pattern: '.*@vector\\.im'
+ # pattern: '^[^@]+@vector\\.im$'
# - medium: msisdn
# pattern: '\\+44'
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index 91ad5b3d3c..9863953f5c 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -162,7 +162,7 @@ def check(
logger.debug("Auth events: %s", [a.event_id for a in auth_events.values()])
if event.type == EventTypes.Member:
- _is_membership_change_allowed(event, auth_events)
+ _is_membership_change_allowed(room_version_obj, event, auth_events)
logger.debug("Allowing! %s", event)
return
@@ -220,8 +220,19 @@ def _can_federate(event: EventBase, auth_events: StateMap[EventBase]) -> bool:
def _is_membership_change_allowed(
- event: EventBase, auth_events: StateMap[EventBase]
+ room_version: RoomVersion, event: EventBase, auth_events: StateMap[EventBase]
) -> None:
+ """
+ Confirms that the event which changes membership is an allowed change.
+
+ Args:
+ room_version: The version of the room.
+ event: The event to check.
+ auth_events: The current auth events of the room.
+
+ Raises:
+ AuthError if the event is not allowed.
+ """
membership = event.content["membership"]
# Check if this is the room creator joining:
@@ -315,14 +326,19 @@ def _is_membership_change_allowed(
if user_level < invite_level:
raise AuthError(403, "You don't have permission to invite users")
elif Membership.JOIN == membership:
- # Joins are valid iff caller == target and they were:
- # invited: They are accepting the invitation
- # joined: It's a NOOP
+ # Joins are valid iff caller == target and:
+ # * They are not banned.
+ # * They are accepting a previously sent invitation.
+ # * They are already joined (it's a NOOP).
+ # * The room is public or restricted.
if event.user_id != target_user_id:
raise AuthError(403, "Cannot force another user to join.")
elif target_banned:
raise AuthError(403, "You are banned from this room")
- elif join_rule == JoinRules.PUBLIC:
+ elif join_rule == JoinRules.PUBLIC or (
+ room_version.msc3083_join_rules
+ and join_rule == JoinRules.MSC3083_RESTRICTED
+ ):
pass
elif join_rule == JoinRules.INVITE:
if not caller_in_room and not caller_invited:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index d84e362070..b9f8d966a6 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -739,22 +739,20 @@ class FederationServer(FederationBase):
await self.handler.on_receive_pdu(origin, pdu, sent_to_us_directly=True)
- def __str__(self):
+ def __str__(self) -> str:
return "<ReplicationLayer(%s)>" % self.server_name
async def exchange_third_party_invite(
self, sender_user_id: str, target_user_id: str, room_id: str, signed: Dict
- ):
- ret = await self.handler.exchange_third_party_invite(
+ ) -> None:
+ await self.handler.exchange_third_party_invite(
sender_user_id, target_user_id, room_id, signed
)
- return ret
- async def on_exchange_third_party_invite_request(self, event_dict: Dict):
- ret = await self.handler.on_exchange_third_party_invite_request(event_dict)
- return ret
+ async def on_exchange_third_party_invite_request(self, event_dict: Dict) -> None:
+ await self.handler.on_exchange_third_party_invite_request(event_dict)
- async def check_server_matches_acl(self, server_name: str, room_id: str):
+ async def check_server_matches_acl(self, server_name: str, room_id: str) -> None:
"""Check if the given server is allowed by the server ACLs in the room
Args:
@@ -870,6 +868,7 @@ class FederationHandlerRegistry:
# A rate limiter for incoming room key requests per origin.
self._room_key_request_rate_limiter = Ratelimiter(
+ store=hs.get_datastore(),
clock=self.clock,
rate_hz=self.config.rc_key_requests.per_second,
burst_count=self.config.rc_key_requests.burst_count,
@@ -877,7 +876,7 @@ class FederationHandlerRegistry:
def register_edu_handler(
self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]]
- ):
+ ) -> None:
"""Sets the handler callable that will be used to handle an incoming
federation EDU of the given type.
@@ -896,7 +895,7 @@ class FederationHandlerRegistry:
def register_query_handler(
self, query_type: str, handler: Callable[[dict], Awaitable[JsonDict]]
- ):
+ ) -> None:
"""Sets the handler callable that will be used to handle an incoming
federation query of the given type.
@@ -914,15 +913,17 @@ class FederationHandlerRegistry:
self.query_handlers[query_type] = handler
- def register_instance_for_edu(self, edu_type: str, instance_name: str):
+ def register_instance_for_edu(self, edu_type: str, instance_name: str) -> None:
"""Register that the EDU handler is on a different instance than master."""
self._edu_type_to_instance[edu_type] = [instance_name]
- def register_instances_for_edu(self, edu_type: str, instance_names: List[str]):
+ def register_instances_for_edu(
+ self, edu_type: str, instance_names: List[str]
+ ) -> None:
"""Register that the EDU handler is on multiple instances."""
self._edu_type_to_instance[edu_type] = instance_names
- async def on_edu(self, edu_type: str, origin: str, content: dict):
+ async def on_edu(self, edu_type: str, origin: str, content: dict) -> None:
if not self.config.use_presence and edu_type == EduTypes.Presence:
return
@@ -930,7 +931,9 @@ class FederationHandlerRegistry:
# the limit, drop them.
if (
edu_type == EduTypes.RoomKeyRequest
- and not self._room_key_request_rate_limiter.can_do_action(origin)
+ and not await self._room_key_request_rate_limiter.can_do_action(
+ None, origin
+ )
):
return
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 89df9a619b..e9c8a9f20a 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -29,6 +29,7 @@ from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
from synapse.federation.units import Edu
from synapse.handlers.presence import format_user_presence_state
+from synapse.logging.opentracing import SynapseTags, set_tag
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import ReadReceipt
@@ -557,6 +558,13 @@ class PerDestinationQueue:
contents, stream_id = await self._store.get_new_device_msgs_for_remote(
self._destination, last_device_stream_id, to_device_stream_id, limit
)
+ for content in contents:
+ message_id = content.get("message_id")
+ if not message_id:
+ continue
+
+ set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
+
edus = [
Edu(
origin=self._server_name,
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 84e39c5a46..5ef0556ef7 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -620,8 +620,8 @@ class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet):
PATH = "/exchange_third_party_invite/(?P<room_id>[^/]*)"
async def on_PUT(self, origin, content, query, room_id):
- content = await self.handler.on_exchange_third_party_invite_request(content)
- return 200, content
+ await self.handler.on_exchange_third_party_invite_request(content)
+ return 200, {}
class FederationClientKeysQueryServlet(BaseFederationServlet):
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index aade2c4a3a..fb899aa90d 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -49,7 +49,7 @@ class BaseHandler:
# The rate_hz and burst_count are overridden on a per-user basis
self.request_ratelimiter = Ratelimiter(
- clock=self.clock, rate_hz=0, burst_count=0
+ store=self.store, clock=self.clock, rate_hz=0, burst_count=0
)
self._rc_message = self.hs.config.rc_message
@@ -57,6 +57,7 @@ class BaseHandler:
# by the presence of rate limits in the config
if self.hs.config.rc_admin_redaction:
self.admin_redaction_ratelimiter = Ratelimiter(
+ store=self.store,
clock=self.clock,
rate_hz=self.hs.config.rc_admin_redaction.per_second,
burst_count=self.hs.config.rc_admin_redaction.burst_count,
@@ -91,11 +92,6 @@ class BaseHandler:
if app_service is not None:
return # do not ratelimit app service senders
- # Disable rate limiting of users belonging to any AS that is configured
- # not to be rate limited in its registration file (rate_limited: true|false).
- if requester.app_service and not requester.app_service.is_rate_limited():
- return
-
messages_per_second = self._rc_message.per_second
burst_count = self._rc_message.burst_count
@@ -113,11 +109,11 @@ class BaseHandler:
if is_admin_redaction and self.admin_redaction_ratelimiter:
# If we have separate config for admin redactions, use a separate
# ratelimiter as to not have user_ids clash
- self.admin_redaction_ratelimiter.ratelimit(user_id, update=update)
+ await self.admin_redaction_ratelimiter.ratelimit(requester, update=update)
else:
# Override rate and burst count per-user
- self.request_ratelimiter.ratelimit(
- user_id,
+ await self.request_ratelimiter.ratelimit(
+ requester,
rate_hz=messages_per_second,
burst_count=burst_count,
update=update,
diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py
index d781bb251d..bee1447c2e 100644
--- a/synapse/handlers/account_validity.py
+++ b/synapse/handlers/account_validity.py
@@ -18,7 +18,7 @@ import email.utils
import logging
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
-from typing import TYPE_CHECKING, List
+from typing import TYPE_CHECKING, List, Optional
from synapse.api.errors import StoreError, SynapseError
from synapse.logging.context import make_deferred_yieldable
@@ -241,7 +241,10 @@ class AccountValidityHandler:
return True
async def renew_account_for_user(
- self, user_id: str, expiration_ts: int = None, email_sent: bool = False
+ self,
+ user_id: str,
+ expiration_ts: Optional[int] = None,
+ email_sent: bool = False,
) -> int:
"""Renews the account attached to a given user by pushing back the
expiration date by the current validity period in the server's
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index d537ea8137..08e413bc98 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -238,6 +238,7 @@ class AuthHandler(BaseHandler):
# Ratelimiter for failed auth during UIA. Uses same ratelimit config
# as per `rc_login.failed_attempts`.
self._failed_uia_attempts_ratelimiter = Ratelimiter(
+ store=self.store,
clock=self.clock,
rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
@@ -248,6 +249,7 @@ class AuthHandler(BaseHandler):
# Ratelimitier for failed /login attempts
self._failed_login_attempts_ratelimiter = Ratelimiter(
+ store=self.store,
clock=hs.get_clock(),
rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
@@ -352,7 +354,7 @@ class AuthHandler(BaseHandler):
requester_user_id = requester.user.to_string()
# Check if we should be ratelimited due to too many previous failed attempts
- self._failed_uia_attempts_ratelimiter.ratelimit(requester_user_id, update=False)
+ await self._failed_uia_attempts_ratelimiter.ratelimit(requester, update=False)
# build a list of supported flows
supported_ui_auth_types = await self._get_available_ui_auth_types(
@@ -373,7 +375,9 @@ class AuthHandler(BaseHandler):
)
except LoginError:
# Update the ratelimiter to say we failed (`can_do_action` doesn't raise).
- self._failed_uia_attempts_ratelimiter.can_do_action(requester_user_id)
+ await self._failed_uia_attempts_ratelimiter.can_do_action(
+ requester,
+ )
raise
# find the completed login type
@@ -982,8 +986,8 @@ class AuthHandler(BaseHandler):
# We also apply account rate limiting using the 3PID as a key, as
# otherwise using 3PID bypasses the ratelimiting based on user ID.
if ratelimit:
- self._failed_login_attempts_ratelimiter.ratelimit(
- (medium, address), update=False
+ await self._failed_login_attempts_ratelimiter.ratelimit(
+ None, (medium, address), update=False
)
# Check for login providers that support 3pid login types
@@ -1016,8 +1020,8 @@ class AuthHandler(BaseHandler):
# this code path, which is fine as then the per-user ratelimit
# will kick in below.
if ratelimit:
- self._failed_login_attempts_ratelimiter.can_do_action(
- (medium, address)
+ await self._failed_login_attempts_ratelimiter.can_do_action(
+ None, (medium, address)
)
raise LoginError(403, "", errcode=Codes.FORBIDDEN)
@@ -1039,8 +1043,8 @@ class AuthHandler(BaseHandler):
# Check if we've hit the failed ratelimit (but don't update it)
if ratelimit:
- self._failed_login_attempts_ratelimiter.ratelimit(
- qualified_user_id.lower(), update=False
+ await self._failed_login_attempts_ratelimiter.ratelimit(
+ None, qualified_user_id.lower(), update=False
)
try:
@@ -1051,8 +1055,8 @@ class AuthHandler(BaseHandler):
# exception and masking the LoginError. The actual ratelimiting
# should have happened above.
if ratelimit:
- self._failed_login_attempts_ratelimiter.can_do_action(
- qualified_user_id.lower()
+ await self._failed_login_attempts_ratelimiter.can_do_action(
+ None, qualified_user_id.lower()
)
raise
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index eb547743be..c971eeb4d2 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -21,10 +21,10 @@ from synapse.api.errors import SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.logging.context import run_in_background
from synapse.logging.opentracing import (
+ SynapseTags,
get_active_span_text_map,
log_kv,
set_tag,
- start_active_span,
)
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
@@ -81,6 +81,7 @@ class DeviceMessageHandler:
)
self._ratelimiter = Ratelimiter(
+ store=self.store,
clock=hs.get_clock(),
rate_hz=hs.config.rc_key_requests.per_second,
burst_count=hs.config.rc_key_requests.burst_count,
@@ -182,7 +183,10 @@ class DeviceMessageHandler:
) -> None:
sender_user_id = requester.user.to_string()
- set_tag("number_of_messages", len(messages))
+ message_id = random_string(16)
+ set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
+
+ log_kv({"number_of_to_device_messages": len(messages)})
set_tag("sender", sender_user_id)
local_messages = {}
remote_messages = {} # type: Dict[str, Dict[str, Dict[str, JsonDict]]]
@@ -191,8 +195,8 @@ class DeviceMessageHandler:
if (
message_type == EduTypes.RoomKeyRequest
and user_id != sender_user_id
- and self._ratelimiter.can_do_action(
- (sender_user_id, requester.device_id)
+ and await self._ratelimiter.can_do_action(
+ requester, (sender_user_id, requester.device_id)
)
):
continue
@@ -204,32 +208,35 @@ class DeviceMessageHandler:
"content": message_content,
"type": message_type,
"sender": sender_user_id,
+ "message_id": message_id,
}
for device_id, message_content in by_device.items()
}
if messages_by_device:
local_messages[user_id] = messages_by_device
+ log_kv(
+ {
+ "user_id": user_id,
+ "device_id": list(messages_by_device),
+ }
+ )
else:
destination = get_domain_from_id(user_id)
remote_messages.setdefault(destination, {})[user_id] = by_device
- message_id = random_string(16)
-
context = get_active_span_text_map()
remote_edu_contents = {}
for destination, messages in remote_messages.items():
- with start_active_span("to_device_for_user"):
- set_tag("destination", destination)
- remote_edu_contents[destination] = {
- "messages": messages,
- "sender": sender_user_id,
- "type": message_type,
- "message_id": message_id,
- "org.matrix.opentracing_context": json_encoder.encode(context),
- }
+ log_kv({"destination": destination})
+ remote_edu_contents[destination] = {
+ "messages": messages,
+ "sender": sender_user_id,
+ "type": message_type,
+ "message_id": message_id,
+ "org.matrix.opentracing_context": json_encoder.encode(context),
+ }
- log_kv({"local_messages": local_messages})
stream_id = await self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents
)
@@ -238,7 +245,6 @@ class DeviceMessageHandler:
"to_device_key", stream_id, users=local_messages.keys()
)
- log_kv({"remote_messages": remote_messages})
if self.federation_sender:
for destination in remote_messages.keys():
# Enqueue a new federation transaction to send the new
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 2ad9b6d930..739653a3fa 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -1008,7 +1008,7 @@ class E2eKeysHandler:
return signature_list, failures
async def _get_e2e_cross_signing_verify_key(
- self, user_id: str, key_type: str, from_user_id: str = None
+ self, user_id: str, key_type: str, from_user_id: Optional[str] = None
) -> Tuple[JsonDict, str, VerifyKey]:
"""Fetch locally or remotely query for a cross-signing public key.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 598a66f74c..5ea8a7b603 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -21,7 +21,17 @@ import itertools
import logging
from collections.abc import Container
from http import HTTPStatus
-from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Sequence, Tuple, Union
+from typing import (
+ TYPE_CHECKING,
+ Dict,
+ Iterable,
+ List,
+ Optional,
+ Sequence,
+ Set,
+ Tuple,
+ Union,
+)
import attr
from signedjson.key import decode_verify_key_bytes
@@ -171,15 +181,17 @@ class FederationHandler(BaseHandler):
self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
- async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:
+ async def on_receive_pdu(
+ self, origin: str, pdu: EventBase, sent_to_us_directly: bool = False
+ ) -> None:
"""Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events
Args:
- origin (str): server which initiated the /send/ transaction. Will
+ origin: server which initiated the /send/ transaction. Will
be used to fetch missing events or state.
- pdu (FrozenEvent): received PDU
- sent_to_us_directly (bool): True if this event was pushed to us; False if
+ pdu: received PDU
+ sent_to_us_directly: True if this event was pushed to us; False if
we pulled it as the result of a missing prev_event.
"""
@@ -411,13 +423,15 @@ class FederationHandler(BaseHandler):
await self._process_received_pdu(origin, pdu, state=state)
- async def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
+ async def _get_missing_events_for_pdu(
+ self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
+ ) -> None:
"""
Args:
- origin (str): Origin of the pdu. Will be called to get the missing events
+ origin: Origin of the pdu. Will be called to get the missing events
pdu: received pdu
- prevs (set(str)): List of event ids which we are missing
- min_depth (int): Minimum depth of events to return.
+ prevs: List of event ids which we are missing
+ min_depth: Minimum depth of events to return.
"""
room_id = pdu.room_id
@@ -778,7 +792,7 @@ class FederationHandler(BaseHandler):
origin: str,
event: EventBase,
state: Optional[Iterable[EventBase]],
- ):
+ ) -> None:
"""Called when we have a new pdu. We need to do auth checks and put it
through the StateHandler.
@@ -887,7 +901,9 @@ class FederationHandler(BaseHandler):
logger.exception("Failed to resync device for %s", sender)
@log_function
- async def backfill(self, dest, room_id, limit, extremities):
+ async def backfill(
+ self, dest: str, room_id: str, limit: int, extremities: List[str]
+ ) -> List[EventBase]:
"""Trigger a backfill request to `dest` for the given `room_id`
This will attempt to get more events from the remote. If the other side
@@ -1142,16 +1158,15 @@ class FederationHandler(BaseHandler):
curr_state = await self.state_handler.get_current_state(room_id)
- def get_domains_from_state(state):
+ def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
"""Get joined domains from state
Args:
- state (dict[tuple, FrozenEvent]): State map from type/state
- key to event.
+ state: State map from type/state key to event.
Returns:
- list[tuple[str, int]]: Returns a list of servers with the
- lowest depth of their joins. Sorted by lowest depth first.
+ Returns a list of servers with the lowest depth of their joins.
+ Sorted by lowest depth first.
"""
joined_users = [
(state_key, int(event.depth))
@@ -1179,7 +1194,7 @@ class FederationHandler(BaseHandler):
domain for domain, depth in curr_domains if domain != self.server_name
]
- async def try_backfill(domains):
+ async def try_backfill(domains: List[str]) -> bool:
# TODO: Should we try multiple of these at a time?
for dom in domains:
try:
@@ -1258,21 +1273,25 @@ class FederationHandler(BaseHandler):
}
for e_id, _ in sorted_extremeties_tuple:
- likely_domains = get_domains_from_state(states[e_id])
+ likely_extremeties_domains = get_domains_from_state(states[e_id])
success = await try_backfill(
- [dom for dom, _ in likely_domains if dom not in tried_domains]
+ [
+ dom
+ for dom, _ in likely_extremeties_domains
+ if dom not in tried_domains
+ ]
)
if success:
return True
- tried_domains.update(dom for dom, _ in likely_domains)
+ tried_domains.update(dom for dom, _ in likely_extremeties_domains)
return False
async def _get_events_and_persist(
self, destination: str, room_id: str, events: Iterable[str]
- ):
+ ) -> None:
"""Fetch the given events from a server, and persist them as outliers.
This function *does not* recursively get missing auth events of the
@@ -1348,7 +1367,7 @@ class FederationHandler(BaseHandler):
event_infos,
)
- def _sanity_check_event(self, ev):
+ def _sanity_check_event(self, ev: EventBase) -> None:
"""
Do some early sanity checks of a received event
@@ -1357,9 +1376,7 @@ class FederationHandler(BaseHandler):
or cascade of event fetches.
Args:
- ev (synapse.events.EventBase): event to be checked
-
- Returns: None
+ ev: event to be checked
Raises:
SynapseError if the event does not pass muster
@@ -1380,7 +1397,7 @@ class FederationHandler(BaseHandler):
)
raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
- async def send_invite(self, target_host, event):
+ async def send_invite(self, target_host: str, event: EventBase) -> EventBase:
"""Sends the invite to the remote server for signing.
Invites must be signed by the invitee's server before distribution.
@@ -1528,12 +1545,13 @@ class FederationHandler(BaseHandler):
run_in_background(self._handle_queued_pdus, room_queue)
- async def _handle_queued_pdus(self, room_queue):
+ async def _handle_queued_pdus(
+ self, room_queue: List[Tuple[EventBase, str]]
+ ) -> None:
"""Process PDUs which got queued up while we were busy send_joining.
Args:
- room_queue (list[FrozenEvent, str]): list of PDUs to be processed
- and the servers that sent them
+ room_queue: list of PDUs to be processed and the servers that sent them
"""
for p, origin in room_queue:
try:
@@ -1612,7 +1630,7 @@ class FederationHandler(BaseHandler):
return event
- async def on_send_join_request(self, origin, pdu):
+ async def on_send_join_request(self, origin: str, pdu: EventBase) -> JsonDict:
"""We have received a join event for a room. Fully process it and
respond with the current state and auth chains.
"""
@@ -1668,7 +1686,7 @@ class FederationHandler(BaseHandler):
async def on_invite_request(
self, origin: str, event: EventBase, room_version: RoomVersion
- ):
+ ) -> EventBase:
"""We've got an invite event. Process and persist it. Sign it.
Respond with the now signed event.
@@ -1711,7 +1729,7 @@ class FederationHandler(BaseHandler):
member_handler = self.hs.get_room_member_handler()
# We don't rate limit based on room ID, as that should be done by
# sending server.
- member_handler.ratelimit_invite(None, event.state_key)
+ await member_handler.ratelimit_invite(None, None, event.state_key)
# keep a record of the room version, if we don't yet know it.
# (this may get overwritten if we later get a different room version in a
@@ -1841,7 +1859,7 @@ class FederationHandler(BaseHandler):
return event
- async def on_send_leave_request(self, origin, pdu):
+ async def on_send_leave_request(self, origin: str, pdu: EventBase) -> None:
""" We have received a leave event for a room. Fully process it."""
event = pdu
@@ -1969,12 +1987,17 @@ class FederationHandler(BaseHandler):
else:
return None
- async def get_min_depth_for_context(self, context):
+ async def get_min_depth_for_context(self, context: str) -> int:
return await self.store.get_min_depth(context)
async def _handle_new_event(
- self, origin, event, state=None, auth_events=None, backfilled=False
- ):
+ self,
+ origin: str,
+ event: EventBase,
+ state: Optional[Iterable[EventBase]] = None,
+ auth_events: Optional[MutableStateMap[EventBase]] = None,
+ backfilled: bool = False,
+ ) -> EventContext:
context = await self._prep_event(
origin, event, state=state, auth_events=auth_events, backfilled=backfilled
)
@@ -2280,40 +2303,14 @@ class FederationHandler(BaseHandler):
logger.warning("Soft-failing %r because %s", event, e)
event.internal_metadata.soft_failed = True
- async def on_query_auth(
- self, origin, event_id, room_id, remote_auth_chain, rejects, missing
- ):
- in_room = await self.auth.check_host_in_room(room_id, origin)
- if not in_room:
- raise AuthError(403, "Host not in room.")
-
- event = await self.store.get_event(event_id, check_room_id=room_id)
-
- # Just go through and process each event in `remote_auth_chain`. We
- # don't want to fall into the trap of `missing` being wrong.
- for e in remote_auth_chain:
- try:
- await self._handle_new_event(origin, e)
- except AuthError:
- pass
-
- # Now get the current auth_chain for the event.
- local_auth_chain = await self.store.get_auth_chain(
- room_id, list(event.auth_event_ids()), include_given=True
- )
-
- # TODO: Check if we would now reject event_id. If so we need to tell
- # everyone.
-
- ret = await self.construct_auth_difference(local_auth_chain, remote_auth_chain)
-
- logger.debug("on_query_auth returning: %s", ret)
-
- return ret
-
async def on_get_missing_events(
- self, origin, room_id, earliest_events, latest_events, limit
- ):
+ self,
+ origin: str,
+ room_id: str,
+ earliest_events: List[str],
+ latest_events: List[str],
+ limit: int,
+ ) -> List[EventBase]:
in_room = await self.auth.check_host_in_room(room_id, origin)
if not in_room:
raise AuthError(403, "Host not in room.")
@@ -2617,8 +2614,8 @@ class FederationHandler(BaseHandler):
assumes that we have already processed all events in remote_auth
Params:
- local_auth (list)
- remote_auth (list)
+ local_auth
+ remote_auth
Returns:
dict
@@ -2742,8 +2739,8 @@ class FederationHandler(BaseHandler):
@log_function
async def exchange_third_party_invite(
- self, sender_user_id, target_user_id, room_id, signed
- ):
+ self, sender_user_id: str, target_user_id: str, room_id: str, signed: JsonDict
+ ) -> None:
third_party_invite = {"signed": signed}
event_dict = {
@@ -2835,8 +2832,12 @@ class FederationHandler(BaseHandler):
await member_handler.send_membership_event(None, event, context)
async def add_display_name_to_third_party_invite(
- self, room_version, event_dict, event, context
- ):
+ self,
+ room_version: str,
+ event_dict: JsonDict,
+ event: EventBase,
+ context: EventContext,
+ ) -> Tuple[EventBase, EventContext]:
key = (
EventTypes.ThirdPartyInvite,
event.content["third_party_invite"]["signed"]["token"],
@@ -2872,13 +2873,13 @@ class FederationHandler(BaseHandler):
EventValidator().validate_new(event, self.config)
return (event, context)
- async def _check_signature(self, event, context):
+ async def _check_signature(self, event: EventBase, context: EventContext) -> None:
"""
Checks that the signature in the event is consistent with its invite.
Args:
- event (Event): The m.room.member event to check
- context (EventContext):
+ event: The m.room.member event to check
+ context:
Raises:
AuthError: if signature didn't match any keys, or key has been
@@ -2964,13 +2965,13 @@ class FederationHandler(BaseHandler):
raise last_exception
- async def _check_key_revocation(self, public_key, url):
+ async def _check_key_revocation(self, public_key: str, url: str) -> None:
"""
Checks whether public_key has been revoked.
Args:
- public_key (str): base-64 encoded public key.
- url (str): Key revocation URL.
+ public_key: base-64 encoded public key.
+ url: Key revocation URL.
Raises:
AuthError: if they key has been revoked.
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 5f346f6d6d..d89fa5fb30 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -61,17 +61,19 @@ class IdentityHandler(BaseHandler):
# Ratelimiters for `/requestToken` endpoints.
self._3pid_validation_ratelimiter_ip = Ratelimiter(
+ store=self.store,
clock=hs.get_clock(),
rate_hz=hs.config.ratelimiting.rc_3pid_validation.per_second,
burst_count=hs.config.ratelimiting.rc_3pid_validation.burst_count,
)
self._3pid_validation_ratelimiter_address = Ratelimiter(
+ store=self.store,
clock=hs.get_clock(),
rate_hz=hs.config.ratelimiting.rc_3pid_validation.per_second,
burst_count=hs.config.ratelimiting.rc_3pid_validation.burst_count,
)
- def ratelimit_request_token_requests(
+ async def ratelimit_request_token_requests(
self,
request: SynapseRequest,
medium: str,
@@ -85,8 +87,12 @@ class IdentityHandler(BaseHandler):
address: The actual threepid ID, e.g. the phone number or email address
"""
- self._3pid_validation_ratelimiter_ip.ratelimit((medium, request.getClientIP()))
- self._3pid_validation_ratelimiter_address.ratelimit((medium, address))
+ await self._3pid_validation_ratelimiter_ip.ratelimit(
+ None, (medium, request.getClientIP())
+ )
+ await self._3pid_validation_ratelimiter_address.ratelimit(
+ None, (medium, address)
+ )
async def threepid_from_creds(
self, id_server: str, creds: Dict[str, str]
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 1b7c065b34..6069968f7f 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -385,7 +385,7 @@ class EventCreationHandler:
self._events_shard_config = self.config.worker.events_shard_config
self._instance_name = hs.get_instance_name()
- self.room_invite_state_types = self.hs.config.room_invite_state_types
+ self.room_invite_state_types = self.hs.config.api.room_prejoin_state
self.membership_types_to_include_profile_data_in = (
{Membership.JOIN, Membership.INVITE}
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 0fc2bf15d5..9701b76d0f 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -204,7 +204,7 @@ class RegistrationHandler(BaseHandler):
Raises:
SynapseError if there was a problem registering.
"""
- self.check_registration_ratelimit(address)
+ await self.check_registration_ratelimit(address)
result = await self.spam_checker.check_registration_for_spam(
threepid,
@@ -583,7 +583,7 @@ class RegistrationHandler(BaseHandler):
errcode=Codes.EXCLUSIVE,
)
- def check_registration_ratelimit(self, address: Optional[str]) -> None:
+ async def check_registration_ratelimit(self, address: Optional[str]) -> None:
"""A simple helper method to check whether the registration rate limit has been hit
for a given IP address
@@ -597,7 +597,7 @@ class RegistrationHandler(BaseHandler):
if not address:
return
- self.ratelimiter.ratelimit(address)
+ await self.ratelimiter.ratelimit(None, address)
async def register_with_store(
self,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 4d20ed8357..1cf12f3255 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -75,22 +75,26 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self.allow_per_room_profiles = self.config.allow_per_room_profiles
self._join_rate_limiter_local = Ratelimiter(
+ store=self.store,
clock=self.clock,
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
burst_count=hs.config.ratelimiting.rc_joins_local.burst_count,
)
self._join_rate_limiter_remote = Ratelimiter(
+ store=self.store,
clock=self.clock,
rate_hz=hs.config.ratelimiting.rc_joins_remote.per_second,
burst_count=hs.config.ratelimiting.rc_joins_remote.burst_count,
)
self._invites_per_room_limiter = Ratelimiter(
+ store=self.store,
clock=self.clock,
rate_hz=hs.config.ratelimiting.rc_invites_per_room.per_second,
burst_count=hs.config.ratelimiting.rc_invites_per_room.burst_count,
)
self._invites_per_user_limiter = Ratelimiter(
+ store=self.store,
clock=self.clock,
rate_hz=hs.config.ratelimiting.rc_invites_per_user.per_second,
burst_count=hs.config.ratelimiting.rc_invites_per_user.burst_count,
@@ -159,15 +163,20 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
async def forget(self, user: UserID, room_id: str) -> None:
raise NotImplementedError()
- def ratelimit_invite(self, room_id: Optional[str], invitee_user_id: str):
+ async def ratelimit_invite(
+ self,
+ requester: Optional[Requester],
+ room_id: Optional[str],
+ invitee_user_id: str,
+ ):
"""Ratelimit invites by room and by target user.
If room ID is missing then we just rate limit by target user.
"""
if room_id:
- self._invites_per_room_limiter.ratelimit(room_id)
+ await self._invites_per_room_limiter.ratelimit(requester, room_id)
- self._invites_per_user_limiter.ratelimit(invitee_user_id)
+ await self._invites_per_user_limiter.ratelimit(requester, invitee_user_id)
async def _local_membership_update(
self,
@@ -237,7 +246,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
(
allowed,
time_allowed,
- ) = self._join_rate_limiter_local.can_requester_do_action(requester)
+ ) = await self._join_rate_limiter_local.can_do_action(requester)
if not allowed:
raise LimitExceededError(
@@ -421,9 +430,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if effective_membership_state == Membership.INVITE:
target_id = target.to_string()
if ratelimit:
- # Don't ratelimit application services.
- if not requester.app_service or requester.app_service.is_rate_limited():
- self.ratelimit_invite(room_id, target_id)
+ await self.ratelimit_invite(requester, room_id, target_id)
# block any attempts to invite the server notices mxid
if target_id == self._server_notices_mxid:
@@ -534,7 +541,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
(
allowed,
time_allowed,
- ) = self._join_rate_limiter_remote.can_requester_do_action(
+ ) = await self._join_rate_limiter_remote.can_do_action(
requester,
)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index ee607e6e65..7b356ba7e5 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -24,6 +24,7 @@ from synapse.api.constants import AccountDataTypes, EventTypes, Membership
from synapse.api.filtering import FilterCollection
from synapse.events import EventBase
from synapse.logging.context import current_context
+from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.roommember import MemberSummary
from synapse.storage.state import StateFilter
@@ -340,7 +341,14 @@ class SyncHandler:
full_state: bool = False,
) -> SyncResult:
"""Get the sync for client needed to match what the server has now."""
- return await self.generate_sync_result(sync_config, since_token, full_state)
+ with start_active_span("current_sync_for_user"):
+ log_kv({"since_token": since_token})
+ sync_result = await self.generate_sync_result(
+ sync_config, since_token, full_state
+ )
+
+ set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
+ return sync_result
async def push_rules_for_user(self, user: UserID) -> JsonDict:
user_id = user.to_string()
@@ -964,6 +972,7 @@ class SyncHandler:
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
now_token = self.event_sources.get_current_token()
+ log_kv({"now_token": now_token})
logger.debug(
"Calculating sync response for %r between %s and %s",
@@ -1225,6 +1234,13 @@ class SyncHandler:
user_id, device_id, since_stream_id, now_token.to_device_key
)
+ for message in messages:
+ # We pop here as we shouldn't be sending the message ID down
+ # `/sync`
+ message_id = message.pop("message_id", None)
+ if message_id:
+ set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
+
logger.debug(
"Returning %d to-device messages between %d and %d (current token: %d)",
len(messages),
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 096d199f4c..bb35af099d 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -19,7 +19,10 @@ from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
from synapse.api.errors import AuthError, ShadowBanError, SynapseError
from synapse.appservice import ApplicationService
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+ run_as_background_process,
+ wrap_as_background_process,
+)
from synapse.replication.tcp.streams import TypingStream
from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -86,6 +89,7 @@ class FollowerTypingHandler:
self._member_last_federation_poke = {}
self.wheel_timer = WheelTimer(bucket_size=5000)
+ @wrap_as_background_process("typing._handle_timeouts")
def _handle_timeouts(self) -> None:
logger.debug("Checking for typing timeouts")
diff --git a/synapse/http/client.py b/synapse/http/client.py
index a0caba84e4..e691ba6d88 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -590,7 +590,7 @@ class SimpleHttpClient:
uri: str,
json_body: Any,
args: Optional[QueryParams] = None,
- headers: RawHeaders = None,
+ headers: Optional[RawHeaders] = None,
) -> Any:
"""Puts some json to the given URI.
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index aa146e8bb8..b8081f197e 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -259,6 +259,14 @@ except ImportError:
logger = logging.getLogger(__name__)
+class SynapseTags:
+ # The message ID of any to_device message processed
+ TO_DEVICE_MESSAGE_ID = "to_device.message_id"
+
+ # Whether the sync response has new data to be returned to the client.
+ SYNC_RESULT = "sync.new_data"
+
+
# Block everything by default
# A regex which matches the server_names to expose traces for.
# None means 'block everything'.
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 1374aae490..c178db57e3 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -39,6 +39,7 @@ from synapse.api.errors import AuthError
from synapse.events import EventBase
from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import PreserveLoggingContext
+from synapse.logging.opentracing import log_kv, start_active_span
from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
from synapse.streams.config import PaginationConfig
@@ -136,6 +137,15 @@ class _NotifierUserStream:
self.last_notified_ms = time_now_ms
noify_deferred = self.notify_deferred
+ log_kv(
+ {
+ "notify": self.user_id,
+ "stream": stream_key,
+ "stream_id": stream_id,
+ "listeners": self.count_listeners(),
+ }
+ )
+
users_woken_by_stream_counter.labels(stream_key).inc()
with PreserveLoggingContext():
@@ -404,6 +414,13 @@ class Notifier:
with Measure(self.clock, "on_new_event"):
user_streams = set()
+ log_kv(
+ {
+ "waking_up_explicit_users": len(users),
+ "waking_up_explicit_rooms": len(rooms),
+ }
+ )
+
for user in users:
user_stream = self.user_to_user_stream.get(str(user))
if user_stream is not None:
@@ -476,12 +493,34 @@ class Notifier:
(end_time - now) / 1000.0,
self.hs.get_reactor(),
)
- with PreserveLoggingContext():
- await listener.deferred
+
+ with start_active_span("wait_for_events.deferred"):
+ log_kv(
+ {
+ "wait_for_events": "sleep",
+ "token": prev_token,
+ }
+ )
+
+ with PreserveLoggingContext():
+ await listener.deferred
+
+ log_kv(
+ {
+ "wait_for_events": "woken",
+ "token": user_stream.current_token,
+ }
+ )
current_token = user_stream.current_token
result = await callback(prev_token, current_token)
+ log_kv(
+ {
+ "wait_for_events": "result",
+ "result": bool(result),
+ }
+ )
if result:
break
@@ -489,8 +528,10 @@ class Notifier:
# has happened between the old prev_token and the current_token
prev_token = current_token
except defer.TimeoutError:
+ log_kv({"wait_for_events": "timeout"})
break
except defer.CancelledError:
+ log_kv({"wait_for_events": "cancelled"})
break
if result is None:
@@ -507,7 +548,7 @@ class Notifier:
pagination_config: PaginationConfig,
timeout: int,
is_guest: bool = False,
- explicit_room_id: str = None,
+ explicit_room_id: Optional[str] = None,
) -> EventStreamResult:
"""For the given user and rooms, return any new events for them. If
there are no new events wait for up to `timeout` milliseconds for any
diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py
index d005f38767..73d7477854 100644
--- a/synapse/replication/http/register.py
+++ b/synapse/replication/http/register.py
@@ -77,7 +77,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
async def _handle_request(self, request, user_id):
content = parse_json_object_from_request(request)
- self.registration_handler.check_registration_ratelimit(content["address"])
+ await self.registration_handler.check_registration_ratelimit(content["address"])
await self.registration_handler.register_with_store(
user_id=user_id,
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 2f4d407f94..98bdeb0ec6 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -60,7 +60,7 @@ class ConstantProperty(Generic[T, V]):
constant = attr.ib() # type: V
- def __get__(self, obj: Optional[T], objtype: Type[T] = None) -> V:
+ def __get__(self, obj: Optional[T], objtype: Optional[Type[T]] = None) -> V:
return self.constant
def __set__(self, obj: Optional[T], value: V):
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index 309bd2771b..fa7804583a 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -36,6 +36,7 @@ from synapse.rest.admin._base import (
)
from synapse.rest.client.v2_alpha._base import client_patterns
from synapse.storage.databases.main.media_repository import MediaSortOrder
+from synapse.storage.databases.main.stats import UserSortOrder
from synapse.types import JsonDict, UserID
if TYPE_CHECKING:
@@ -117,8 +118,26 @@ class UsersRestServletV2(RestServlet):
guests = parse_boolean(request, "guests", default=True)
deactivated = parse_boolean(request, "deactivated", default=False)
+ order_by = parse_string(
+ request,
+ "order_by",
+ default=UserSortOrder.NAME.value,
+ allowed_values=(
+ UserSortOrder.NAME.value,
+ UserSortOrder.DISPLAYNAME.value,
+ UserSortOrder.GUEST.value,
+ UserSortOrder.ADMIN.value,
+ UserSortOrder.DEACTIVATED.value,
+ UserSortOrder.USER_TYPE.value,
+ UserSortOrder.AVATAR_URL.value,
+ UserSortOrder.SHADOW_BANNED.value,
+ ),
+ )
+
+ direction = parse_string(request, "dir", default="f", allowed_values=("f", "b"))
+
users, total = await self.store.get_users_paginate(
- start, limit, user_id, name, guests, deactivated
+ start, limit, user_id, name, guests, deactivated, order_by, direction
)
ret = {"users": users, "total": total}
if (start + limit) < total:
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index e4c352f572..3151e72d4f 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -74,11 +74,13 @@ class LoginRestServlet(RestServlet):
self._well_known_builder = WellKnownBuilder(hs)
self._address_ratelimiter = Ratelimiter(
+ store=hs.get_datastore(),
clock=hs.get_clock(),
rate_hz=self.hs.config.rc_login_address.per_second,
burst_count=self.hs.config.rc_login_address.burst_count,
)
self._account_ratelimiter = Ratelimiter(
+ store=hs.get_datastore(),
clock=hs.get_clock(),
rate_hz=self.hs.config.rc_login_account.per_second,
burst_count=self.hs.config.rc_login_account.burst_count,
@@ -141,20 +143,22 @@ class LoginRestServlet(RestServlet):
appservice = self.auth.get_appservice_by_req(request)
if appservice.is_rate_limited():
- self._address_ratelimiter.ratelimit(request.getClientIP())
+ await self._address_ratelimiter.ratelimit(
+ None, request.getClientIP()
+ )
result = await self._do_appservice_login(login_submission, appservice)
elif self.jwt_enabled and (
login_submission["type"] == LoginRestServlet.JWT_TYPE
or login_submission["type"] == LoginRestServlet.JWT_TYPE_DEPRECATED
):
- self._address_ratelimiter.ratelimit(request.getClientIP())
+ await self._address_ratelimiter.ratelimit(None, request.getClientIP())
result = await self._do_jwt_login(login_submission)
elif login_submission["type"] == LoginRestServlet.TOKEN_TYPE:
- self._address_ratelimiter.ratelimit(request.getClientIP())
+ await self._address_ratelimiter.ratelimit(None, request.getClientIP())
result = await self._do_token_login(login_submission)
else:
- self._address_ratelimiter.ratelimit(request.getClientIP())
+ await self._address_ratelimiter.ratelimit(None, request.getClientIP())
result = await self._do_other_login(login_submission)
except KeyError:
raise SynapseError(400, "Missing JSON keys.")
@@ -258,7 +262,7 @@ class LoginRestServlet(RestServlet):
# too often. This happens here rather than before as we don't
# necessarily know the user before now.
if ratelimit:
- self._account_ratelimiter.ratelimit(user_id.lower())
+ await self._account_ratelimiter.ratelimit(None, user_id.lower())
if create_non_existent_users:
canonical_uid = await self.auth_handler.check_user_exists(user_id)
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index c2ba790bab..411fb57c47 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -103,7 +103,9 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
# Raise if the provided next_link value isn't valid
assert_valid_next_link(self.hs, next_link)
- self.identity_handler.ratelimit_request_token_requests(request, "email", email)
+ await self.identity_handler.ratelimit_request_token_requests(
+ request, "email", email
+ )
# The email will be sent to the stored address.
# This avoids a potential account hijack by requesting a password reset to
@@ -387,7 +389,9 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
Codes.THREEPID_DENIED,
)
- self.identity_handler.ratelimit_request_token_requests(request, "email", email)
+ await self.identity_handler.ratelimit_request_token_requests(
+ request, "email", email
+ )
if next_link:
# Raise if the provided next_link value isn't valid
@@ -468,7 +472,7 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
Codes.THREEPID_DENIED,
)
- self.identity_handler.ratelimit_request_token_requests(
+ await self.identity_handler.ratelimit_request_token_requests(
request, "msisdn", msisdn
)
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 8f68d8dfc8..c212da0cb2 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -126,7 +126,9 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
Codes.THREEPID_DENIED,
)
- self.identity_handler.ratelimit_request_token_requests(request, "email", email)
+ await self.identity_handler.ratelimit_request_token_requests(
+ request, "email", email
+ )
existing_user_id = await self.hs.get_datastore().get_user_id_by_threepid(
"email", email
@@ -208,7 +210,7 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet):
Codes.THREEPID_DENIED,
)
- self.identity_handler.ratelimit_request_token_requests(
+ await self.identity_handler.ratelimit_request_token_requests(
request, "msisdn", msisdn
)
@@ -406,7 +408,7 @@ class RegisterRestServlet(RestServlet):
client_addr = request.getClientIP()
- self.ratelimiter.ratelimit(client_addr, update=False)
+ await self.ratelimiter.ratelimit(None, client_addr, update=False)
kind = b"user"
if b"kind" in request.args:
diff --git a/synapse/server.py b/synapse/server.py
index e85b9391fa..e42f7b1a18 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -329,6 +329,7 @@ class HomeServer(metaclass=abc.ABCMeta):
@cache_in_self
def get_registration_ratelimiter(self) -> Ratelimiter:
return Ratelimiter(
+ store=self.get_datastore(),
clock=self.get_clock(),
rate_hz=self.config.rc_registration.per_second,
burst_count=self.config.rc_registration.burst_count,
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 1d44c3aa2c..b3d16ca7ac 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -21,6 +21,7 @@ from typing import List, Optional, Tuple
from synapse.api.constants import PresenceState
from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import DatabasePool
+from synapse.storage.databases.main.stats import UserSortOrder
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import (
IdGenerator,
@@ -292,6 +293,8 @@ class DataStore(
name: Optional[str] = None,
guests: bool = True,
deactivated: bool = False,
+ order_by: UserSortOrder = UserSortOrder.USER_ID.value,
+ direction: str = "f",
) -> Tuple[List[JsonDict], int]:
"""Function to retrieve a paginated list of users from
users list. This will return a json list of users and the
@@ -304,6 +307,8 @@ class DataStore(
name: search for local part of user_id or display name
guests: whether to in include guest users
deactivated: whether to include deactivated users
+ order_by: the sort order of the returned list
+ direction: sort ascending or descending
Returns:
A tuple of a list of mappings from user to information and a count of total users.
"""
@@ -312,6 +317,14 @@ class DataStore(
filters = []
args = [self.hs.config.server_name]
+ # Set ordering
+ order_by_column = UserSortOrder(order_by).value
+
+ if direction == "b":
+ order = "DESC"
+ else:
+ order = "ASC"
+
# `name` is in database already in lower case
if name:
filters.append("(name LIKE ? OR LOWER(displayname) LIKE ?)")
@@ -339,10 +352,15 @@ class DataStore(
txn.execute(sql, args)
count = txn.fetchone()[0]
- sql = (
- "SELECT name, user_type, is_guest, admin, deactivated, shadow_banned, displayname, avatar_url "
- + sql_base
- + " ORDER BY u.name LIMIT ? OFFSET ?"
+ sql = """
+ SELECT name, user_type, is_guest, admin, deactivated, shadow_banned, displayname, avatar_url
+ {sql_base}
+ ORDER BY {order_by_column} {order}, u.name ASC
+ LIMIT ? OFFSET ?
+ """.format(
+ sql_base=sql_base,
+ order_by_column=order_by_column,
+ order=order,
)
args += [limit, start]
txn.execute(sql, args)
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 952d4969b2..c00780969f 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -16,7 +16,7 @@
import logging
import threading
from collections import namedtuple
-from typing import Dict, Iterable, List, Optional, Tuple, overload
+from typing import Container, Dict, Iterable, List, Optional, Tuple, overload
from constantly import NamedConstant, Names
from typing_extensions import Literal
@@ -544,7 +544,7 @@ class EventsWorkerStore(SQLBaseStore):
async def get_stripped_room_state_from_event_context(
self,
context: EventContext,
- state_types_to_include: List[EventTypes],
+ state_types_to_include: Container[str],
membership_user_id: Optional[str] = None,
) -> List[JsonDict]:
"""
diff --git a/synapse/storage/databases/main/group_server.py b/synapse/storage/databases/main/group_server.py
index ac07e0197b..8f462dfc31 100644
--- a/synapse/storage/databases/main/group_server.py
+++ b/synapse/storage/databases/main/group_server.py
@@ -1027,8 +1027,8 @@ class GroupServerStore(GroupServerWorkerStore):
user_id: str,
is_admin: bool = False,
is_public: bool = True,
- local_attestation: dict = None,
- remote_attestation: dict = None,
+ local_attestation: Optional[dict] = None,
+ remote_attestation: Optional[dict] = None,
) -> None:
"""Add a user to the group server.
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 1c99393c65..bce8946c21 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -66,18 +66,37 @@ TYPE_TO_ORIGIN_TABLE = {"room": ("rooms", "room_id"), "user": ("users", "name")}
class UserSortOrder(Enum):
"""
Enum to define the sorting method used when returning users
- with get_users_media_usage_paginate
+ with get_users_paginate in __init__.py
+ and get_users_media_usage_paginate in stats.py
- MEDIA_LENGTH = ordered by size of uploaded media. Smallest to largest.
- MEDIA_COUNT = ordered by number of uploaded media. Smallest to largest.
+ When moves this to __init__.py gets `builtins.ImportError` with
+ `most likely due to a circular import`
+
+ MEDIA_LENGTH = ordered by size of uploaded media.
+ MEDIA_COUNT = ordered by number of uploaded media.
USER_ID = ordered alphabetically by `user_id`.
+ NAME = ordered alphabetically by `user_id`. This is for compatibility reasons,
+ as the user_id is returned in the name field in the response in list users admin API.
DISPLAYNAME = ordered alphabetically by `displayname`
+ GUEST = ordered by `is_guest`
+ ADMIN = ordered by `admin`
+ DEACTIVATED = ordered by `deactivated`
+ USER_TYPE = ordered alphabetically by `user_type`
+ AVATAR_URL = ordered alphabetically by `avatar_url`
+ SHADOW_BANNED = ordered by `shadow_banned`
"""
MEDIA_LENGTH = "media_length"
MEDIA_COUNT = "media_count"
USER_ID = "user_id"
+ NAME = "name"
DISPLAYNAME = "displayname"
+ GUEST = "is_guest"
+ ADMIN = "admin"
+ DEACTIVATED = "deactivated"
+ USER_TYPE = "user_type"
+ AVATAR_URL = "avatar_url"
+ SHADOW_BANNED = "shadow_banned"
class StatsStore(StateDeltasStore):
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 6c3c2da520..c7f0b8ccb5 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -13,7 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import imp
+import importlib.util
import logging
import os
import re
@@ -454,8 +454,13 @@ def _upgrade_existing_database(
)
module_name = "synapse.storage.v%d_%s" % (v, root_name)
- with open(absolute_path) as python_file:
- module = imp.load_source(module_name, absolute_path, python_file) # type: ignore
+
+ spec = importlib.util.spec_from_file_location(
+ module_name, absolute_path
+ )
+ module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(module) # type: ignore
+
logger.info("Running script %s", relative_path)
module.run_create(cur, database_engine) # type: ignore
if not is_empty:
diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py
index 1adc92eb90..dd392cf694 100644
--- a/synapse/util/caches/deferred_cache.py
+++ b/synapse/util/caches/deferred_cache.py
@@ -283,7 +283,9 @@ class DeferredCache(Generic[KT, VT]):
# we return a new Deferred which will be called before any subsequent observers.
return observable.observe()
- def prefill(self, key: KT, value: VT, callback: Callable[[], None] = None):
+ def prefill(
+ self, key: KT, value: VT, callback: Optional[Callable[[], None]] = None
+ ):
callbacks = [callback] if callback else []
self.cache.set(key, value, callbacks=callbacks)
|