diff options
Diffstat (limited to 'synapse')
73 files changed, 1198 insertions, 635 deletions
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index de7c56bc0f..82aeef8d19 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -128,20 +128,7 @@ USER_FILTER_SCHEMA = { "account_data": {"$ref": "#/definitions/filter"}, "room": {"$ref": "#/definitions/room_filter"}, "event_format": {"type": "string", "enum": ["client", "federation"]}, - "event_fields": { - "type": "array", - "items": { - "type": "string", - # Don't allow '\\' in event field filters. This makes matching - # events a lot easier as we can then use a negative lookbehind - # assertion to split '\.' If we allowed \\ then it would - # incorrectly split '\\.' See synapse.events.utils.serialize_event - # - # Note that because this is a regular expression, we have to escape - # each backslash in the pattern. - "pattern": r"^((?!\\\\).)*$", - }, - }, + "event_fields": {"type": "array", "items": {"type": "string"}}, }, "additionalProperties": True, # Allow new fields for forward compatibility } diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py index 7030b133d3..035a14171b 100644 --- a/synapse/api/room_versions.py +++ b/synapse/api/room_versions.py @@ -485,6 +485,30 @@ class RoomVersions: msc3931_push_features=(), msc3989_redaction_rules=True, ) + MSC3820opt2 = RoomVersion( + # Based upon v10 + "org.matrix.msc3820.opt2", + RoomDisposition.UNSTABLE, + EventFormatVersions.ROOM_V4_PLUS, + StateResolutionVersions.V2, + enforce_key_validity=True, + special_case_aliases_auth=False, + strict_canonicaljson=True, + limit_notifications_power_levels=True, + msc2175_implicit_room_creator=True, # Used by MSC3820 + msc2176_redaction_rules=True, # Used by MSC3820 + msc3083_join_rules=True, + msc3375_redaction_rules=True, + msc2403_knocking=True, + msc2716_historical=False, + msc2716_redactions=False, + msc3389_relation_redactions=False, + msc3787_knock_restricted_join_rule=True, + msc3667_int_only_power_levels=True, + msc3821_redaction_rules=True, # Used by MSC3820 + msc3931_push_features=(), + msc3989_redaction_rules=True, # Used by MSC3820 + ) KNOWN_ROOM_VERSIONS: Dict[str, RoomVersion] = { diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index b05fe2c589..f9aada269a 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -64,7 +64,7 @@ from synapse.util.logcontext import LoggingContext logger = logging.getLogger("synapse.app.admin_cmd") -class AdminCmdSlavedStore( +class AdminCmdStore( FilteringWorkerStore, ClientIpWorkerStore, DeviceWorkerStore, @@ -103,7 +103,7 @@ class AdminCmdSlavedStore( class AdminCmdServer(HomeServer): - DATASTORE_CLASS = AdminCmdSlavedStore # type: ignore + DATASTORE_CLASS = AdminCmdStore # type: ignore async def export_data_command(hs: HomeServer, args: argparse.Namespace) -> None: diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index e17ce35b8e..909ebccf78 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -102,7 +102,7 @@ from synapse.util.httpresourcetree import create_resource_tree logger = logging.getLogger("synapse.app.generic_worker") -class GenericWorkerSlavedStore( +class GenericWorkerStore( # FIXME(#3714): We need to add UserDirectoryStore as we write directly # rather than going via the correct worker. UserDirectoryStore, @@ -154,7 +154,7 @@ class GenericWorkerSlavedStore( class GenericWorkerServer(HomeServer): - DATASTORE_CLASS = GenericWorkerSlavedStore # type: ignore + DATASTORE_CLASS = GenericWorkerStore # type: ignore def _listen_http(self, listener_config: ListenerConfig) -> None: assert listener_config.http_options is not None diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py index 897dd3edac..09988670da 100644 --- a/synapse/app/phone_stats_home.py +++ b/synapse/app/phone_stats_home.py @@ -127,10 +127,6 @@ async def phone_stats_home( daily_sent_messages = await store.count_daily_sent_messages() stats["daily_sent_messages"] = daily_sent_messages - r30_results = await store.count_r30_users() - for name, count in r30_results.items(): - stats["r30_users_" + name] = count - r30v2_results = await store.count_r30v2_users() for name, count in r30v2_results.items(): stats["r30v2_users_" + name] = count diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 35c330a3c4..2260a8f589 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -86,6 +86,7 @@ class ApplicationService: url.rstrip("/") if isinstance(url, str) else None ) # url must not end with a slash self.hs_token = hs_token + # The full Matrix ID for this application service's sender. self.sender = sender self.namespaces = self._check_namespaces(namespaces) self.id = id @@ -212,7 +213,7 @@ class ApplicationService: True if the application service is interested in the user, False if not. """ return ( - # User is the appservice's sender_localpart user + # User is the appservice's configured sender_localpart user user_id == self.sender # User is in the appservice's user namespace or self.is_user_in_namespace(user_id) diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 2ce60610ca..1d268a1817 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -44,6 +44,7 @@ import jinja2 import pkg_resources import yaml +from synapse.types import StrSequence from synapse.util.templates import _create_mxc_to_http_filter, _format_ts_filter logger = logging.getLogger(__name__) @@ -58,7 +59,7 @@ class ConfigError(Exception): the problem lies. """ - def __init__(self, msg: str, path: Optional[Iterable[str]] = None): + def __init__(self, msg: str, path: Optional[StrSequence] = None): self.msg = msg self.path = path diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi index b5cec132b4..fc51aed234 100644 --- a/synapse/config/_base.pyi +++ b/synapse/config/_base.pyi @@ -61,9 +61,10 @@ from synapse.config import ( # noqa: F401 voip, workers, ) +from synapse.types import StrSequence class ConfigError(Exception): - def __init__(self, msg: str, path: Optional[Iterable[str]] = None): + def __init__(self, msg: str, path: Optional[StrSequence] = None): self.msg = msg self.path = path diff --git a/synapse/config/_util.py b/synapse/config/_util.py index dfc5d12210..acccca413b 100644 --- a/synapse/config/_util.py +++ b/synapse/config/_util.py @@ -11,17 +11,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, Dict, Iterable, Type, TypeVar +from typing import Any, Dict, Type, TypeVar import jsonschema from pydantic import BaseModel, ValidationError, parse_obj_as from synapse.config._base import ConfigError -from synapse.types import JsonDict +from synapse.types import JsonDict, StrSequence def validate_config( - json_schema: JsonDict, config: Any, config_path: Iterable[str] + json_schema: JsonDict, config: Any, config_path: StrSequence ) -> None: """Validates a config setting against a JsonSchema definition @@ -45,7 +45,7 @@ def validate_config( def json_error_to_config_error( - e: jsonschema.ValidationError, config_path: Iterable[str] + e: jsonschema.ValidationError, config_path: StrSequence ) -> ConfigError: """Converts a json validation error to a user-readable ConfigError diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index fd89960e72..c2710fdf04 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -36,11 +36,10 @@ class AppServiceConfig(Config): if not isinstance(self.app_service_config_files, list) or not all( type(x) is str for x in self.app_service_config_files ): - # type-ignore: this function gets arbitrary json value; we do use this path. raise ConfigError( "Expected '%s' to be a list of AS config files:" % (self.app_service_config_files), - "app_service_config_files", + ("app_service_config_files",), ) self.track_appservice_user_ips = config.get("track_appservice_user_ips", False) diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 6e453bd963..d769b7f668 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -84,18 +84,6 @@ class ExperimentalConfig(Config): "msc3984_appservice_key_query", False ) - # MSC3706 (server-side support for partial state in /send_join responses) - # Synapse will always serve partial state responses to requests using the stable - # query parameter `omit_members`. If this flag is set, Synapse will also serve - # partial state responses to requests using the unstable query parameter - # `org.matrix.msc3706.partial_state`. - self.msc3706_enabled: bool = experimental.get("msc3706_enabled", False) - - # experimental support for faster joins over federation - # (MSC2775, MSC3706, MSC3895) - # requires a target server that can provide a partial join response (MSC3706) - self.faster_joins_enabled: bool = experimental.get("faster_joins", True) - # MSC3720 (Account status endpoint) self.msc3720_enabled: bool = experimental.get("msc3720_enabled", False) diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 56db875b25..1e080133dc 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -117,9 +117,7 @@ root: # Write logs to the `buffer` handler, which will buffer them together in memory, # then write them to a file. # - # Replace "buffer" with "console" to log to stderr instead. (Note that you'll - # also need to update the configuration for the `twisted` logger above, in - # this case.) + # Replace "buffer" with "console" to log to stderr instead. # handlers: [buffer] diff --git a/synapse/config/oembed.py b/synapse/config/oembed.py index 0d32aba70a..d7959639ee 100644 --- a/synapse/config/oembed.py +++ b/synapse/config/oembed.py @@ -19,7 +19,7 @@ from urllib import parse as urlparse import attr import pkg_resources -from synapse.types import JsonDict +from synapse.types import JsonDict, StrSequence from ._base import Config, ConfigError from ._util import validate_config @@ -80,7 +80,7 @@ class OembedConfig(Config): ) def _parse_and_validate_provider( - self, providers: List[JsonDict], config_path: Iterable[str] + self, providers: List[JsonDict], config_path: StrSequence ) -> Iterable[OEmbedEndpointConfig]: # Ensure it is the proper form. validate_config( @@ -112,7 +112,7 @@ class OembedConfig(Config): api_endpoint, patterns, endpoint.get("formats") ) - def _glob_to_pattern(self, glob: str, config_path: Iterable[str]) -> Pattern: + def _glob_to_pattern(self, glob: str, config_path: StrSequence) -> Pattern: """ Convert the glob into a sane regular expression to match against. The rules followed will be slightly different for the domain portion vs. diff --git a/synapse/config/repository.py b/synapse/config/repository.py index 655f06505b..f6cfdd3e04 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -224,20 +224,20 @@ class ContentRepositoryConfig(Config): if "http" in proxy_env or "https" in proxy_env: logger.warning("".join(HTTP_PROXY_SET_WARNING)) - # we always blacklist '0.0.0.0' and '::', which are supposed to be + # we always block '0.0.0.0' and '::', which are supposed to be # unroutable addresses. - self.url_preview_ip_range_blacklist = generate_ip_set( + self.url_preview_ip_range_blocklist = generate_ip_set( config["url_preview_ip_range_blacklist"], ["0.0.0.0", "::"], config_path=("url_preview_ip_range_blacklist",), ) - self.url_preview_ip_range_whitelist = generate_ip_set( + self.url_preview_ip_range_allowlist = generate_ip_set( config.get("url_preview_ip_range_whitelist", ()), config_path=("url_preview_ip_range_whitelist",), ) - self.url_preview_url_blacklist = config.get("url_preview_url_blacklist", ()) + self.url_preview_url_blocklist = config.get("url_preview_url_blacklist", ()) self.url_preview_accept_language = config.get( "url_preview_accept_language" diff --git a/synapse/config/server.py b/synapse/config/server.py index 386c3194b8..b46fa51593 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -27,7 +27,7 @@ from netaddr import AddrFormatError, IPNetwork, IPSet from twisted.conch.ssh.keys import Key from synapse.api.room_versions import KNOWN_ROOM_VERSIONS -from synapse.types import JsonDict +from synapse.types import JsonDict, StrSequence from synapse.util.module_loader import load_module from synapse.util.stringutils import parse_and_validate_server_name @@ -73,7 +73,7 @@ def _6to4(network: IPNetwork) -> IPNetwork: def generate_ip_set( ip_addresses: Optional[Iterable[str]], extra_addresses: Optional[Iterable[str]] = None, - config_path: Optional[Iterable[str]] = None, + config_path: Optional[StrSequence] = None, ) -> IPSet: """ Generate an IPSet from a list of IP addresses or CIDRs. @@ -115,7 +115,7 @@ def generate_ip_set( # IP ranges that are considered private / unroutable / don't make sense. -DEFAULT_IP_RANGE_BLACKLIST = [ +DEFAULT_IP_RANGE_BLOCKLIST = [ # Localhost "127.0.0.0/8", # Private networks. @@ -501,36 +501,36 @@ class ServerConfig(Config): # due to resource constraints self.admin_contact = config.get("admin_contact", None) - ip_range_blacklist = config.get( - "ip_range_blacklist", DEFAULT_IP_RANGE_BLACKLIST + ip_range_blocklist = config.get( + "ip_range_blacklist", DEFAULT_IP_RANGE_BLOCKLIST ) # Attempt to create an IPSet from the given ranges - # Always blacklist 0.0.0.0, :: - self.ip_range_blacklist = generate_ip_set( - ip_range_blacklist, ["0.0.0.0", "::"], config_path=("ip_range_blacklist",) + # Always block 0.0.0.0, :: + self.ip_range_blocklist = generate_ip_set( + ip_range_blocklist, ["0.0.0.0", "::"], config_path=("ip_range_blacklist",) ) - self.ip_range_whitelist = generate_ip_set( + self.ip_range_allowlist = generate_ip_set( config.get("ip_range_whitelist", ()), config_path=("ip_range_whitelist",) ) # The federation_ip_range_blacklist is used for backwards-compatibility # and only applies to federation and identity servers. if "federation_ip_range_blacklist" in config: - # Always blacklist 0.0.0.0, :: - self.federation_ip_range_blacklist = generate_ip_set( + # Always block 0.0.0.0, :: + self.federation_ip_range_blocklist = generate_ip_set( config["federation_ip_range_blacklist"], ["0.0.0.0", "::"], config_path=("federation_ip_range_blacklist",), ) # 'federation_ip_range_whitelist' was never a supported configuration option. - self.federation_ip_range_whitelist = None + self.federation_ip_range_allowlist = None else: # No backwards-compatiblity requrired, as federation_ip_range_blacklist # is not given. Default to ip_range_blacklist and ip_range_whitelist. - self.federation_ip_range_blacklist = self.ip_range_blacklist - self.federation_ip_range_whitelist = self.ip_range_whitelist + self.federation_ip_range_blocklist = self.ip_range_blocklist + self.federation_ip_range_allowlist = self.ip_range_allowlist # (undocumented) option for torturing the worker-mode replication a bit, # for testing. The value defines the number of milliseconds to pause before diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 9b4d692cf4..e7e8225b8e 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -19,6 +19,7 @@ from immutabledict import immutabledict from synapse.appservice import ApplicationService from synapse.events import EventBase +from synapse.logging.opentracing import tag_args, trace from synapse.types import JsonDict, StateMap if TYPE_CHECKING: @@ -242,6 +243,8 @@ class EventContext(UnpersistedEventContextBase): return self._state_group + @trace + @tag_args async def get_current_state_ids( self, state_filter: Optional["StateFilter"] = None ) -> Optional[StateMap[str]]: @@ -275,6 +278,8 @@ class EventContext(UnpersistedEventContextBase): return prev_state_ids + @trace + @tag_args async def get_prev_state_ids( self, state_filter: Optional["StateFilter"] = None ) -> StateMap[str]: diff --git a/synapse/events/utils.py b/synapse/events/utils.py index e6d040176b..e7b7b78b84 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -22,6 +22,7 @@ from typing import ( Iterable, List, Mapping, + Match, MutableMapping, Optional, Union, @@ -46,12 +47,10 @@ if TYPE_CHECKING: from synapse.handlers.relations import BundledAggregations -# Split strings on "." but not "\." This uses a negative lookbehind assertion for '\' -# (?<!stuff) matches if the current position in the string is not preceded -# by a match for 'stuff'. -# TODO: This is fast, but fails to handle "foo\\.bar" which should be treated as -# the literal fields "foo\" and "bar" but will instead be treated as "foo\\.bar" -SPLIT_FIELD_REGEX = re.compile(r"(?<!\\)\.") +# Split strings on "." but not "\." (or "\\\."). +SPLIT_FIELD_REGEX = re.compile(r"\\*\.") +# Find escaped characters, e.g. those with a \ in front of them. +ESCAPE_SEQUENCE_PATTERN = re.compile(r"\\(.)") CANONICALJSON_MAX_INT = (2**53) - 1 CANONICALJSON_MIN_INT = -CANONICALJSON_MAX_INT @@ -253,6 +252,57 @@ def _copy_field(src: JsonDict, dst: JsonDict, field: List[str]) -> None: sub_out_dict[key_to_move] = sub_dict[key_to_move] +def _escape_slash(m: Match[str]) -> str: + """ + Replacement function; replace a backslash-backslash or backslash-dot with the + second character. Leaves any other string alone. + """ + if m.group(1) in ("\\", "."): + return m.group(1) + return m.group(0) + + +def _split_field(field: str) -> List[str]: + """ + Splits strings on unescaped dots and removes escaping. + + Args: + field: A string representing a path to a field. + + Returns: + A list of nested fields to traverse. + """ + + # Convert the field and remove escaping: + # + # 1. "content.body.thing\.with\.dots" + # 2. ["content", "body", "thing\.with\.dots"] + # 3. ["content", "body", "thing.with.dots"] + + # Find all dots (and their preceding backslashes). If the dot is unescaped + # then emit a new field part. + result = [] + prev_start = 0 + for match in SPLIT_FIELD_REGEX.finditer(field): + # If the match is an *even* number of characters than the dot was escaped. + if len(match.group()) % 2 == 0: + continue + + # Add a new part (up to the dot, exclusive) after escaping. + result.append( + ESCAPE_SEQUENCE_PATTERN.sub( + _escape_slash, field[prev_start : match.end() - 1] + ) + ) + prev_start = match.end() + + # Add any part of the field after the last unescaped dot. (Note that if the + # character is a dot this correctly adds a blank string.) + result.append(re.sub(r"\\(.)", _escape_slash, field[prev_start:])) + + return result + + def only_fields(dictionary: JsonDict, fields: List[str]) -> JsonDict: """Return a new dict with only the fields in 'dictionary' which are present in 'fields'. @@ -260,7 +310,7 @@ def only_fields(dictionary: JsonDict, fields: List[str]) -> JsonDict: If there are no event fields specified then all fields are included. The entries may include '.' characters to indicate sub-fields. So ['content.body'] will include the 'body' field of the 'content' object. - A literal '.' character in a field name may be escaped using a '\'. + A literal '.' or '\' character in a field name may be escaped using a '\'. Args: dictionary: The dictionary to read from. @@ -275,13 +325,7 @@ def only_fields(dictionary: JsonDict, fields: List[str]) -> JsonDict: # for each field, convert it: # ["content.body.thing\.with\.dots"] => [["content", "body", "thing\.with\.dots"]] - split_fields = [SPLIT_FIELD_REGEX.split(f) for f in fields] - - # for each element of the output array of arrays: - # remove escaping so we can use the right key names. - split_fields[:] = [ - [f.replace(r"\.", r".") for f in field_array] for field_array in split_fields - ] + split_fields = [_split_field(f) for f in fields] output: JsonDict = {} for field_array in split_fields: diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 076b9287c6..a2cf3a96c6 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -236,6 +236,7 @@ class FederationClient(FederationBase): async def claim_client_keys( self, + user: UserID, destination: str, query: Dict[str, Dict[str, Dict[str, int]]], timeout: Optional[int], @@ -243,6 +244,7 @@ class FederationClient(FederationBase): """Claims one-time keys for a device hosted on a remote server. Args: + user: The user id of the requesting user destination: Domain name of the remote homeserver content: The query content. @@ -279,7 +281,7 @@ class FederationClient(FederationBase): if use_unstable: try: return await self.transport_layer.claim_client_keys_unstable( - destination, unstable_content, timeout + user, destination, unstable_content, timeout ) except HttpResponseException as e: # If an error is received that is due to an unrecognised endpoint, @@ -295,7 +297,7 @@ class FederationClient(FederationBase): logger.debug("Skipping unstable claim client keys API") return await self.transport_layer.claim_client_keys( - destination, content, timeout + user, destination, content, timeout ) @trace diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index c590d8f96f..f4ca70a698 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -739,12 +739,10 @@ class FederationServer(FederationBase): "event": event_json, "state": [p.get_pdu_json(time_now) for p in state_events], "auth_chain": [p.get_pdu_json(time_now) for p in auth_chain_events], - "org.matrix.msc3706.partial_state": caller_supports_partial_state, "members_omitted": caller_supports_partial_state, } if servers_in_room is not None: - resp["org.matrix.msc3706.servers_in_room"] = list(servers_in_room) resp["servers_in_room"] = list(servers_in_room) return resp diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index d2fa9976da..0b17f713ea 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -45,7 +45,7 @@ from synapse.events import EventBase, make_event_from_dict from synapse.federation.units import Transaction from synapse.http.matrixfederationclient import ByteParser, LegacyJsonSendParser from synapse.http.types import QueryParams -from synapse.types import JsonDict +from synapse.types import JsonDict, UserID from synapse.util import ExceptionBundle if TYPE_CHECKING: @@ -59,7 +59,6 @@ class TransportLayerClient: def __init__(self, hs: "HomeServer"): self.client = hs.get_federation_http_client() - self._faster_joins_enabled = hs.config.experimental.faster_joins_enabled self._is_mine_server_name = hs.is_mine_server_name async def get_room_state_ids( @@ -363,12 +362,8 @@ class TransportLayerClient: ) -> "SendJoinResponse": path = _create_v2_path("/send_join/%s/%s", room_id, event_id) query_params: Dict[str, str] = {} - if self._faster_joins_enabled: - # lazy-load state on join - query_params["org.matrix.msc3706.partial_state"] = ( - "true" if omit_members else "false" - ) - query_params["omit_members"] = "true" if omit_members else "false" + # lazy-load state on join + query_params["omit_members"] = "true" if omit_members else "false" return await self.client.put_json( destination=destination, @@ -635,7 +630,11 @@ class TransportLayerClient: ) async def claim_client_keys( - self, destination: str, query_content: JsonDict, timeout: Optional[int] + self, + user: UserID, + destination: str, + query_content: JsonDict, + timeout: Optional[int], ) -> JsonDict: """Claim one-time keys for a list of devices hosted on a remote server. @@ -660,6 +659,7 @@ class TransportLayerClient: } Args: + user: the user_id of the requesting user destination: The server to query. query_content: The user ids to query. Returns: @@ -676,7 +676,11 @@ class TransportLayerClient: ) async def claim_client_keys_unstable( - self, destination: str, query_content: JsonDict, timeout: Optional[int] + self, + user: UserID, + destination: str, + query_content: JsonDict, + timeout: Optional[int], ) -> JsonDict: """Claim one-time keys for a list of devices hosted on a remote server. @@ -701,6 +705,7 @@ class TransportLayerClient: } Args: + user: the user_id of the requesting user destination: The server to query. query_content: The user ids to query. Returns: @@ -902,9 +907,7 @@ def _members_omitted_parser(response: SendJoinResponse) -> Generator[None, Any, while True: val = yield if not isinstance(val, bool): - raise TypeError( - "members_omitted (formerly org.matrix.msc370c.partial_state) must be a boolean" - ) + raise TypeError("members_omitted must be a boolean") response.members_omitted = val @@ -967,27 +970,11 @@ class SendJoinParser(ByteParser[SendJoinResponse]): self._coros.append( ijson.items_coro( _members_omitted_parser(self._response), - "org.matrix.msc3706.partial_state", - use_float="True", - ) - ) - # The stable field name comes last, so it "wins" if the fields disagree - self._coros.append( - ijson.items_coro( - _members_omitted_parser(self._response), "members_omitted", use_float="True", ) ) - self._coros.append( - ijson.items_coro( - _servers_in_room_parser(self._response), - "org.matrix.msc3706.servers_in_room", - use_float="True", - ) - ) - # Again, stable field name comes last self._coros.append( ijson.items_coro( diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py index 36b0362504..3a744e25be 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py @@ -440,7 +440,6 @@ class FederationV2SendJoinServlet(BaseFederationServerServlet): server_name: str, ): super().__init__(hs, authenticator, ratelimiter, server_name) - self._read_msc3706_query_param = hs.config.experimental.msc3706_enabled async def on_PUT( self, @@ -453,16 +452,7 @@ class FederationV2SendJoinServlet(BaseFederationServerServlet): # TODO(paul): assert that event_id parsed from path actually # match those given in content - partial_state = False - # The stable query parameter wins, if it disagrees with the unstable - # parameter for some reason. - stable_param = parse_boolean_from_args(query, "omit_members", default=None) - if stable_param is not None: - partial_state = stable_param - elif self._read_msc3706_query_param: - partial_state = parse_boolean_from_args( - query, "org.matrix.msc3706.partial_state", default=False - ) + partial_state = parse_boolean_from_args(query, "omit_members", default=False) result = await self.handler.on_send_join_request( origin, content, room_id, caller_supports_partial_state=partial_state diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 59e340974d..d001f2fb2f 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -52,7 +52,6 @@ from synapse.api.errors import ( NotFoundError, StoreError, SynapseError, - UserDeactivatedError, ) from synapse.api.ratelimiting import Ratelimiter from synapse.handlers.ui_auth import ( @@ -1419,12 +1418,6 @@ class AuthHandler: return None (user_id, password_hash) = lookupres - # If the password hash is None, the account has likely been deactivated - if not password_hash: - deactivated = await self.store.get_user_deactivated_status(user_id) - if deactivated: - raise UserDeactivatedError("This account has been deactivated") - result = await self.validate_hash(password, password_hash) if not result: logger.warning("Failed password login for user %s", user_id) @@ -1749,8 +1742,11 @@ class AuthHandler: registered. auth_provider_session_id: The session ID from the SSO IdP received during login. """ - # If the account has been deactivated, do not proceed with the login - # flow. + # If the account has been deactivated, do not proceed with the login. + # + # This gets checked again when the token is submitted but this lets us + # provide an HTML error page to the user (instead of issuing a token and + # having it error later). deactivated = await self.store.get_user_deactivated_status(registered_user_id) if deactivated: respond_with_html(request, 403, self._sso_account_deactivated_template) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 24741b667b..ad075497c8 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -661,6 +661,7 @@ class E2eKeysHandler: async def claim_one_time_keys( self, query: Dict[str, Dict[str, Dict[str, int]]], + user: UserID, timeout: Optional[int], always_include_fallback_keys: bool, ) -> JsonDict: @@ -703,7 +704,7 @@ class E2eKeysHandler: device_keys = remote_queries[destination] try: remote_result = await self.federation.claim_client_keys( - destination, device_keys, timeout=timeout + user, destination, device_keys, timeout=timeout ) for user_id, keys in remote_result["one_time_keys"].items(): if user_id in device_keys: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 19dec4812f..2eb28d55ac 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -148,7 +148,7 @@ class FederationHandler: self._event_auth_handler = hs.get_event_auth_handler() self._server_notices_mxid = hs.config.servernotices.server_notices_mxid self.config = hs.config - self.http_client = hs.get_proxied_blacklisted_http_client() + self.http_client = hs.get_proxied_blocklisted_http_client() self._replication = hs.get_replication_data_handler() self._federation_event_handler = hs.get_federation_event_handler() self._device_handler = hs.get_device_handler() diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 06343d40e4..42141d3670 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -88,7 +88,7 @@ from synapse.types import ( ) from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer, concurrently_execute -from synapse.util.iterutils import batch_iter +from synapse.util.iterutils import batch_iter, partition from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import shortstr @@ -865,7 +865,7 @@ class FederationEventHandler: [event.event_id for event in events] ) - new_events = [] + new_events: List[EventBase] = [] for event in events: event_id = event.event_id @@ -890,12 +890,71 @@ class FederationEventHandler: # Continue on with the events that are new to us. new_events.append(event) - # We want to sort these by depth so we process them and - # tell clients about them in order. - sorted_events = sorted(new_events, key=lambda x: x.depth) - for ev in sorted_events: - with nested_logging_context(ev.event_id): - await self._process_pulled_event(origin, ev, backfilled=backfilled) + set_tag( + SynapseTags.RESULT_PREFIX + "new_events.length", + str(len(new_events)), + ) + + @trace + async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None: + # We want to sort these by depth so we process them and tell clients about + # them in order. It's also more efficient to backfill this way (`depth` + # ascending) because one backfill event is likely to be the `prev_event` of + # the next event we're going to process. + sorted_events = sorted(new_events, key=lambda x: x.depth) + for ev in sorted_events: + with nested_logging_context(ev.event_id): + await self._process_pulled_event(origin, ev, backfilled=backfilled) + + # Check if we've already tried to process these events at some point in the + # past. We aren't concerned with the expontntial backoff here, just whether it + # has failed to be processed before. + event_ids_with_failed_pull_attempts = ( + await self._store.get_event_ids_with_failed_pull_attempts( + [event.event_id for event in new_events] + ) + ) + + # We construct the event lists in source order from `/backfill` response because + # it's a) easiest, but also b) the order in which we process things matters for + # MSC2716 historical batches because many historical events are all at the same + # `depth` and we rely on the tenuous sort that the other server gave us and hope + # they're doing their best. The brittle nature of this ordering for historical + # messages over federation is one of the reasons why we don't want to continue + # on MSC2716 until we have online topological ordering. + events_with_failed_pull_attempts, fresh_events = partition( + new_events, lambda e: e.event_id in event_ids_with_failed_pull_attempts + ) + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "events_with_failed_pull_attempts", + str(event_ids_with_failed_pull_attempts), + ) + set_tag( + SynapseTags.RESULT_PREFIX + "events_with_failed_pull_attempts.length", + str(len(events_with_failed_pull_attempts)), + ) + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "fresh_events", + str([event.event_id for event in fresh_events]), + ) + set_tag( + SynapseTags.RESULT_PREFIX + "fresh_events.length", + str(len(fresh_events)), + ) + + # Process previously failed backfill events in the background to not waste + # time on something that is likely to fail again. + if len(events_with_failed_pull_attempts) > 0: + run_as_background_process( + "_process_new_pulled_events_with_failed_pull_attempts", + _process_new_pulled_events, + events_with_failed_pull_attempts, + ) + + # We can optimistically try to process and wait for the event to be fully + # persisted if we've never tried before. + if len(fresh_events) > 0: + await _process_new_pulled_events(fresh_events) @trace @tag_args diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index bf0f7acf80..3031384d25 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -52,10 +52,10 @@ class IdentityHandler: # An HTTP client for contacting trusted URLs. self.http_client = SimpleHttpClient(hs) # An HTTP client for contacting identity servers specified by clients. - self.blacklisting_http_client = SimpleHttpClient( + self._http_client = SimpleHttpClient( hs, - ip_blacklist=hs.config.server.federation_ip_range_blacklist, - ip_whitelist=hs.config.server.federation_ip_range_whitelist, + ip_blocklist=hs.config.server.federation_ip_range_blocklist, + ip_allowlist=hs.config.server.federation_ip_range_allowlist, ) self.federation_http_client = hs.get_federation_http_client() self.hs = hs @@ -197,7 +197,7 @@ class IdentityHandler: try: # Use the blacklisting http client as this call is only to identity servers # provided by a client - data = await self.blacklisting_http_client.post_json_get_json( + data = await self._http_client.post_json_get_json( bind_url, bind_data, headers=headers ) @@ -308,9 +308,7 @@ class IdentityHandler: try: # Use the blacklisting http client as this call is only to identity servers # provided by a client - await self.blacklisting_http_client.post_json_get_json( - url, content, headers - ) + await self._http_client.post_json_get_json(url, content, headers) changed = True except HttpResponseException as e: changed = False @@ -579,7 +577,7 @@ class IdentityHandler: """ # Check what hashing details are supported by this identity server try: - hash_details = await self.blacklisting_http_client.get_json( + hash_details = await self._http_client.get_json( "%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server), {"access_token": id_access_token}, ) @@ -646,7 +644,7 @@ class IdentityHandler: headers = {"Authorization": create_id_access_token_header(id_access_token)} try: - lookup_results = await self.blacklisting_http_client.post_json_get_json( + lookup_results = await self._http_client.post_json_get_json( "%s%s/_matrix/identity/v2/lookup" % (id_server_scheme, id_server), { "addresses": [lookup_value], @@ -752,7 +750,7 @@ class IdentityHandler: url = "%s%s/_matrix/identity/v2/store-invite" % (id_server_scheme, id_server) try: - data = await self.blacklisting_http_client.post_json_get_json( + data = await self._http_client.post_json_get_json( url, invite_config, {"Authorization": create_id_access_token_header(id_access_token)}, diff --git a/synapse/handlers/jwt.py b/synapse/handlers/jwt.py new file mode 100644 index 0000000000..740bf9b3c4 --- /dev/null +++ b/synapse/handlers/jwt.py @@ -0,0 +1,105 @@ +# Copyright 2023 Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import TYPE_CHECKING + +from authlib.jose import JsonWebToken, JWTClaims +from authlib.jose.errors import BadSignatureError, InvalidClaimError, JoseError + +from synapse.api.errors import Codes, LoginError +from synapse.types import JsonDict, UserID + +if TYPE_CHECKING: + from synapse.server import HomeServer + + +class JwtHandler: + def __init__(self, hs: "HomeServer"): + self.hs = hs + + self.jwt_secret = hs.config.jwt.jwt_secret + self.jwt_subject_claim = hs.config.jwt.jwt_subject_claim + self.jwt_algorithm = hs.config.jwt.jwt_algorithm + self.jwt_issuer = hs.config.jwt.jwt_issuer + self.jwt_audiences = hs.config.jwt.jwt_audiences + + def validate_login(self, login_submission: JsonDict) -> str: + """ + Authenticates the user for the /login API + + Args: + login_submission: the whole of the login submission + (including 'type' and other relevant fields) + + Returns: + The user ID that is logging in. + + Raises: + LoginError if there was an authentication problem. + """ + token = login_submission.get("token", None) + if token is None: + raise LoginError( + 403, "Token field for JWT is missing", errcode=Codes.FORBIDDEN + ) + + jwt = JsonWebToken([self.jwt_algorithm]) + claim_options = {} + if self.jwt_issuer is not None: + claim_options["iss"] = {"value": self.jwt_issuer, "essential": True} + if self.jwt_audiences is not None: + claim_options["aud"] = {"values": self.jwt_audiences, "essential": True} + + try: + claims = jwt.decode( + token, + key=self.jwt_secret, + claims_cls=JWTClaims, + claims_options=claim_options, + ) + except BadSignatureError: + # We handle this case separately to provide a better error message + raise LoginError( + 403, + "JWT validation failed: Signature verification failed", + errcode=Codes.FORBIDDEN, + ) + except JoseError as e: + # A JWT error occurred, return some info back to the client. + raise LoginError( + 403, + "JWT validation failed: %s" % (str(e),), + errcode=Codes.FORBIDDEN, + ) + + try: + claims.validate(leeway=120) # allows 2 min of clock skew + + # Enforce the old behavior which is rolled out in productive + # servers: if the JWT contains an 'aud' claim but none is + # configured, the login attempt will fail + if claims.get("aud") is not None: + if self.jwt_audiences is None or len(self.jwt_audiences) == 0: + raise InvalidClaimError("aud") + except JoseError as e: + raise LoginError( + 403, + "JWT validation failed: %s" % (str(e),), + errcode=Codes.FORBIDDEN, + ) + + user = claims.get(self.jwt_subject_claim, None) + if user is None: + raise LoginError(403, "Invalid JWT", errcode=Codes.FORBIDDEN) + + return UserID(user, self.hs.hostname).to_string() diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index 6d35e61880..49a497a860 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -16,6 +16,7 @@ import logging from typing import TYPE_CHECKING from synapse.api.constants import ReceiptTypes +from synapse.api.errors import SynapseError from synapse.util.async_helpers import Linearizer if TYPE_CHECKING: @@ -47,12 +48,21 @@ class ReadMarkerHandler: ) should_update = True + # Get event ordering, this also ensures we know about the event + event_ordering = await self.store.get_event_ordering(event_id) if existing_read_marker: - # Only update if the new marker is ahead in the stream - should_update = await self.store.is_event_after( - event_id, existing_read_marker["event_id"] - ) + try: + old_event_ordering = await self.store.get_event_ordering( + existing_read_marker["event_id"] + ) + except SynapseError: + # Old event no longer exists, assume new is ahead. This may + # happen if the old event was removed due to retention. + pass + else: + # Only update if the new marker is ahead in the stream + should_update = event_ordering > old_event_ordering if should_update: content = {"event_id": event_id} diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py index 25fd2eb3a1..c3a51722bd 100644 --- a/synapse/handlers/sso.py +++ b/synapse/handlers/sso.py @@ -204,7 +204,7 @@ class SsoHandler: self._media_repo = ( hs.get_media_repository() if hs.config.media.can_load_media_repo else None ) - self._http_client = hs.get_proxied_blacklisted_http_client() + self._http_client = hs.get_proxied_blocklisted_http_client() # The following template is shown after a successful user interactive # authentication session. It tells the user they can close the window. diff --git a/synapse/http/client.py b/synapse/http/client.py index c9479c81ff..09ea93e10d 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -117,22 +117,22 @@ RawHeaderValue = Union[ ] -def check_against_blacklist( - ip_address: IPAddress, ip_whitelist: Optional[IPSet], ip_blacklist: IPSet +def _is_ip_blocked( + ip_address: IPAddress, allowlist: Optional[IPSet], blocklist: IPSet ) -> bool: """ Compares an IP address to allowed and disallowed IP sets. Args: ip_address: The IP address to check - ip_whitelist: Allowed IP addresses. - ip_blacklist: Disallowed IP addresses. + allowlist: Allowed IP addresses. + blocklist: Disallowed IP addresses. Returns: - True if the IP address is in the blacklist and not in the whitelist. + True if the IP address is in the blocklist and not in the allowlist. """ - if ip_address in ip_blacklist: - if ip_whitelist is None or ip_address not in ip_whitelist: + if ip_address in blocklist: + if allowlist is None or ip_address not in allowlist: return True return False @@ -154,27 +154,27 @@ def _make_scheduler( return _scheduler -class _IPBlacklistingResolver: +class _IPBlockingResolver: """ - A proxy for reactor.nameResolver which only produces non-blacklisted IP - addresses, preventing DNS rebinding attacks on URL preview. + A proxy for reactor.nameResolver which only produces non-blocklisted IP + addresses, preventing DNS rebinding attacks. """ def __init__( self, reactor: IReactorPluggableNameResolver, - ip_whitelist: Optional[IPSet], - ip_blacklist: IPSet, + ip_allowlist: Optional[IPSet], + ip_blocklist: IPSet, ): """ Args: reactor: The twisted reactor. - ip_whitelist: IP addresses to allow. - ip_blacklist: IP addresses to disallow. + ip_allowlist: IP addresses to allow. + ip_blocklist: IP addresses to disallow. """ self._reactor = reactor - self._ip_whitelist = ip_whitelist - self._ip_blacklist = ip_blacklist + self._ip_allowlist = ip_allowlist + self._ip_blocklist = ip_blocklist def resolveHostName( self, recv: IResolutionReceiver, hostname: str, portNumber: int = 0 @@ -191,16 +191,13 @@ class _IPBlacklistingResolver: ip_address = IPAddress(address.host) - if check_against_blacklist( - ip_address, self._ip_whitelist, self._ip_blacklist - ): + if _is_ip_blocked(ip_address, self._ip_allowlist, self._ip_blocklist): logger.info( - "Dropped %s from DNS resolution to %s due to blacklist" - % (ip_address, hostname) + "Blocked %s from DNS resolution to %s" % (ip_address, hostname) ) has_bad_ip = True - # if we have a blacklisted IP, we'd like to raise an error to block the + # if we have a blocked IP, we'd like to raise an error to block the # request, but all we can really do from here is claim that there were no # valid results. if not has_bad_ip: @@ -232,24 +229,24 @@ class _IPBlacklistingResolver: # ISynapseReactor implies IReactorCore, but explicitly marking it this as an implementer # of IReactorCore seems to keep mypy-zope happier. @implementer(IReactorCore, ISynapseReactor) -class BlacklistingReactorWrapper: +class BlocklistingReactorWrapper: """ - A Reactor wrapper which will prevent DNS resolution to blacklisted IP + A Reactor wrapper which will prevent DNS resolution to blocked IP addresses, to prevent DNS rebinding. """ def __init__( self, reactor: IReactorPluggableNameResolver, - ip_whitelist: Optional[IPSet], - ip_blacklist: IPSet, + ip_allowlist: Optional[IPSet], + ip_blocklist: IPSet, ): self._reactor = reactor - # We need to use a DNS resolver which filters out blacklisted IP + # We need to use a DNS resolver which filters out blocked IP # addresses, to prevent DNS rebinding. - self._nameResolver = _IPBlacklistingResolver( - self._reactor, ip_whitelist, ip_blacklist + self._nameResolver = _IPBlockingResolver( + self._reactor, ip_allowlist, ip_blocklist ) def __getattr__(self, attr: str) -> Any: @@ -260,7 +257,7 @@ class BlacklistingReactorWrapper: return getattr(self._reactor, attr) -class BlacklistingAgentWrapper(Agent): +class BlocklistingAgentWrapper(Agent): """ An Agent wrapper which will prevent access to IP addresses being accessed directly (without an IP address lookup). @@ -269,18 +266,18 @@ class BlacklistingAgentWrapper(Agent): def __init__( self, agent: IAgent, - ip_blacklist: IPSet, - ip_whitelist: Optional[IPSet] = None, + ip_blocklist: IPSet, + ip_allowlist: Optional[IPSet] = None, ): """ Args: agent: The Agent to wrap. - ip_whitelist: IP addresses to allow. - ip_blacklist: IP addresses to disallow. + ip_allowlist: IP addresses to allow. + ip_blocklist: IP addresses to disallow. """ self._agent = agent - self._ip_whitelist = ip_whitelist - self._ip_blacklist = ip_blacklist + self._ip_allowlist = ip_allowlist + self._ip_blocklist = ip_blocklist def request( self, @@ -299,13 +296,9 @@ class BlacklistingAgentWrapper(Agent): # Not an IP pass else: - if check_against_blacklist( - ip_address, self._ip_whitelist, self._ip_blacklist - ): - logger.info("Blocking access to %s due to blacklist" % (ip_address,)) - e = SynapseError( - HTTPStatus.FORBIDDEN, "IP address blocked by IP blacklist entry" - ) + if _is_ip_blocked(ip_address, self._ip_allowlist, self._ip_blocklist): + logger.info("Blocking access to %s" % (ip_address,)) + e = SynapseError(HTTPStatus.FORBIDDEN, "IP address blocked") return defer.fail(Failure(e)) return self._agent.request( @@ -763,10 +756,9 @@ class SimpleHttpClient(BaseHttpClient): Args: hs: The HomeServer instance to pass in treq_args: Extra keyword arguments to be given to treq.request. - ip_blacklist: The IP addresses that are blacklisted that - we may not request. - ip_whitelist: The whitelisted IP addresses, that we can - request if it were otherwise caught in a blacklist. + ip_blocklist: The IP addresses that we may not request. + ip_allowlist: The allowed IP addresses, that we can + request if it were otherwise caught in a blocklist. use_proxy: Whether proxy settings should be discovered and used from conventional environment variables. """ @@ -775,19 +767,19 @@ class SimpleHttpClient(BaseHttpClient): self, hs: "HomeServer", treq_args: Optional[Dict[str, Any]] = None, - ip_whitelist: Optional[IPSet] = None, - ip_blacklist: Optional[IPSet] = None, + ip_allowlist: Optional[IPSet] = None, + ip_blocklist: Optional[IPSet] = None, use_proxy: bool = False, ): super().__init__(hs, treq_args=treq_args) - self._ip_whitelist = ip_whitelist - self._ip_blacklist = ip_blacklist - - if self._ip_blacklist: - # If we have an IP blacklist, we need to use a DNS resolver which - # filters out blacklisted IP addresses, to prevent DNS rebinding. - self.reactor: ISynapseReactor = BlacklistingReactorWrapper( - self.reactor, self._ip_whitelist, self._ip_blacklist + self._ip_allowlist = ip_allowlist + self._ip_blocklist = ip_blocklist + + if self._ip_blocklist: + # If we have an IP blocklist, we need to use a DNS resolver which + # filters out blocked IP addresses, to prevent DNS rebinding. + self.reactor: ISynapseReactor = BlocklistingReactorWrapper( + self.reactor, self._ip_allowlist, self._ip_blocklist ) # the pusher makes lots of concurrent SSL connections to Sygnal, and tends to @@ -809,14 +801,13 @@ class SimpleHttpClient(BaseHttpClient): use_proxy=use_proxy, ) - if self._ip_blacklist: - # If we have an IP blacklist, we then install the blacklisting Agent - # which prevents direct access to IP addresses, that are not caught - # by the DNS resolution. - self.agent = BlacklistingAgentWrapper( + if self._ip_blocklist: + # If we have an IP blocklist, we then install the Agent which prevents + # direct access to IP addresses, that are not caught by the DNS resolution. + self.agent = BlocklistingAgentWrapper( self.agent, - ip_blacklist=self._ip_blacklist, - ip_whitelist=self._ip_whitelist, + ip_blocklist=self._ip_blocklist, + ip_allowlist=self._ip_allowlist, ) @@ -844,6 +835,7 @@ class ReplicationClient(BaseHttpClient): self.agent: IAgent = ReplicationAgent( hs.get_reactor(), + hs.config.worker.instance_map, contextFactory=hs.get_http_client_context_factory(), pool=pool, ) diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index 8d7d0a3875..7e8cf31682 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -36,7 +36,7 @@ from twisted.web.iweb import IAgent, IAgentEndpointFactory, IBodyProducer, IResp from synapse.crypto.context_factory import FederationPolicyForHTTPS from synapse.http import proxyagent -from synapse.http.client import BlacklistingAgentWrapper, BlacklistingReactorWrapper +from synapse.http.client import BlocklistingAgentWrapper, BlocklistingReactorWrapper from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint from synapse.http.federation.srv_resolver import Server, SrvResolver from synapse.http.federation.well_known_resolver import WellKnownResolver @@ -65,12 +65,12 @@ class MatrixFederationAgent: user_agent: The user agent header to use for federation requests. - ip_whitelist: Allowed IP addresses. + ip_allowlist: Allowed IP addresses. - ip_blacklist: Disallowed IP addresses. + ip_blocklist: Disallowed IP addresses. proxy_reactor: twisted reactor to use for connections to the proxy server - reactor might have some blacklisting applied (i.e. for DNS queries), + reactor might have some blocking applied (i.e. for DNS queries), but we need unblocked access to the proxy. _srv_resolver: @@ -87,17 +87,17 @@ class MatrixFederationAgent: reactor: ISynapseReactor, tls_client_options_factory: Optional[FederationPolicyForHTTPS], user_agent: bytes, - ip_whitelist: Optional[IPSet], - ip_blacklist: IPSet, + ip_allowlist: Optional[IPSet], + ip_blocklist: IPSet, _srv_resolver: Optional[SrvResolver] = None, _well_known_resolver: Optional[WellKnownResolver] = None, ): - # proxy_reactor is not blacklisted + # proxy_reactor is not blocklisting reactor proxy_reactor = reactor - # We need to use a DNS resolver which filters out blacklisted IP + # We need to use a DNS resolver which filters out blocked IP # addresses, to prevent DNS rebinding. - reactor = BlacklistingReactorWrapper(reactor, ip_whitelist, ip_blacklist) + reactor = BlocklistingReactorWrapper(reactor, ip_allowlist, ip_blocklist) self._clock = Clock(reactor) self._pool = HTTPConnectionPool(reactor) @@ -120,7 +120,7 @@ class MatrixFederationAgent: if _well_known_resolver is None: _well_known_resolver = WellKnownResolver( reactor, - agent=BlacklistingAgentWrapper( + agent=BlocklistingAgentWrapper( ProxyAgent( reactor, proxy_reactor, @@ -128,7 +128,7 @@ class MatrixFederationAgent: contextFactory=tls_client_options_factory, use_proxy=True, ), - ip_blacklist=ip_blacklist, + ip_blocklist=ip_blocklist, ), user_agent=self.user_agent, ) @@ -256,7 +256,7 @@ class MatrixHostnameEndpoint: Args: reactor: twisted reactor to use for underlying requests proxy_reactor: twisted reactor to use for connections to the proxy server. - 'reactor' might have some blacklisting applied (i.e. for DNS queries), + 'reactor' might have some blocking applied (i.e. for DNS queries), but we need unblocked access to the proxy. tls_client_options_factory: factory to use for fetching client tls options, or none to disable TLS. diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 634882487c..9094dab0fe 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -64,7 +64,7 @@ from synapse.api.errors import ( from synapse.crypto.context_factory import FederationPolicyForHTTPS from synapse.http import QuieterFileBodyProducer from synapse.http.client import ( - BlacklistingAgentWrapper, + BlocklistingAgentWrapper, BodyExceededMaxSize, ByteWriteable, _make_scheduler, @@ -392,15 +392,15 @@ class MatrixFederationHttpClient: self.reactor, tls_client_options_factory, user_agent.encode("ascii"), - hs.config.server.federation_ip_range_whitelist, - hs.config.server.federation_ip_range_blacklist, + hs.config.server.federation_ip_range_allowlist, + hs.config.server.federation_ip_range_blocklist, ) - # Use a BlacklistingAgentWrapper to prevent circumventing the IP - # blacklist via IP literals in server names - self.agent = BlacklistingAgentWrapper( + # Use a BlocklistingAgentWrapper to prevent circumventing the IP + # blocking via IP literals in server names + self.agent = BlocklistingAgentWrapper( federation_agent, - ip_blacklist=hs.config.server.federation_ip_range_blacklist, + ip_blocklist=hs.config.server.federation_ip_range_blocklist, ) self.clock = hs.get_clock() diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py index 94ef737b9e..7bdc4acae7 100644 --- a/synapse/http/proxyagent.py +++ b/synapse/http/proxyagent.py @@ -53,7 +53,7 @@ class ProxyAgent(_AgentBase): connections. proxy_reactor: twisted reactor to use for connections to the proxy server - reactor might have some blacklisting applied (i.e. for DNS queries), + reactor might have some blocking applied (i.e. for DNS queries), but we need unblocked access to the proxy. contextFactory: A factory for TLS contexts, to control the diff --git a/synapse/http/replicationagent.py b/synapse/http/replicationagent.py index 5ecd08be0f..800f21873d 100644 --- a/synapse/http/replicationagent.py +++ b/synapse/http/replicationagent.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import Optional +from typing import Dict, Optional from zope.interface import implementer @@ -32,6 +32,7 @@ from twisted.web.iweb import ( IResponse, ) +from synapse.config.workers import InstanceLocationConfig from synapse.types import ISynapseReactor logger = logging.getLogger(__name__) @@ -44,9 +45,11 @@ class ReplicationEndpointFactory: def __init__( self, reactor: ISynapseReactor, + instance_map: Dict[str, InstanceLocationConfig], context_factory: IPolicyForHTTPS, ) -> None: self.reactor = reactor + self.instance_map = instance_map self.context_factory = context_factory def endpointForURI(self, uri: URI) -> IStreamClientEndpoint: @@ -58,15 +61,29 @@ class ReplicationEndpointFactory: Returns: The correct client endpoint object """ - if uri.scheme in (b"http", b"https"): - endpoint = HostnameEndpoint(self.reactor, uri.host, uri.port) - if uri.scheme == b"https": + # The given URI has a special scheme and includes the worker name. The + # actual connection details are pulled from the instance map. + worker_name = uri.netloc.decode("utf-8") + scheme = self.instance_map[worker_name].scheme() + + if scheme in ("http", "https"): + endpoint = HostnameEndpoint( + self.reactor, + self.instance_map[worker_name].host, + self.instance_map[worker_name].port, + ) + if scheme == "https": endpoint = wrapClientTLS( - self.context_factory.creatorForNetloc(uri.host, uri.port), endpoint + # The 'port' argument below isn't actually used by the function + self.context_factory.creatorForNetloc( + self.instance_map[worker_name].host, + self.instance_map[worker_name].port, + ), + endpoint, ) return endpoint else: - raise SchemeNotSupported(f"Unsupported scheme: {uri.scheme!r}") + raise SchemeNotSupported(f"Unsupported scheme: {scheme}") @implementer(IAgent) @@ -80,6 +97,7 @@ class ReplicationAgent(_AgentBase): def __init__( self, reactor: ISynapseReactor, + instance_map: Dict[str, InstanceLocationConfig], contextFactory: IPolicyForHTTPS, connectTimeout: Optional[float] = None, bindAddress: Optional[bytes] = None, @@ -102,7 +120,9 @@ class ReplicationAgent(_AgentBase): created. """ _AgentBase.__init__(self, reactor, pool) - endpoint_factory = ReplicationEndpointFactory(reactor, contextFactory) + endpoint_factory = ReplicationEndpointFactory( + reactor, instance_map, contextFactory + ) self._endpointFactory = endpoint_factory def request( diff --git a/synapse/media/url_previewer.py b/synapse/media/url_previewer.py index c8a4a809f1..70b32cee17 100644 --- a/synapse/media/url_previewer.py +++ b/synapse/media/url_previewer.py @@ -105,7 +105,7 @@ class UrlPreviewer: When Synapse is asked to preview a URL it does the following: - 1. Checks against a URL blacklist (defined as `url_preview_url_blacklist` in the + 1. Checks against a URL blocklist (defined as `url_preview_url_blacklist` in the config). 2. Checks the URL against an in-memory cache and returns the result if it exists. (This is also used to de-duplicate processing of multiple in-flight requests at once.) @@ -113,7 +113,7 @@ class UrlPreviewer: 1. Checks URL and timestamp against the database cache and returns the result if it has not expired and was successful (a 2xx return code). 2. Checks if the URL matches an oEmbed (https://oembed.com/) pattern. If it - does, update the URL to download. + does and the new URL is not blocked, update the URL to download. 3. Downloads the URL and stores it into a file via the media storage provider and saves the local media metadata. 4. If the media is an image: @@ -127,14 +127,14 @@ class UrlPreviewer: and saves the local media metadata. 2. Convert the oEmbed response to an Open Graph response. 3. Override any Open Graph data from the HTML with data from oEmbed. - 4. If an image exists in the Open Graph response: + 4. If an image URL exists in the Open Graph response: 1. Downloads the URL and stores it into a file via the media storage provider and saves the local media metadata. 2. Generates thumbnails. 3. Updates the Open Graph response based on image properties. - 6. If the media is JSON and an oEmbed URL was found: + 6. If an oEmbed URL was found and the media is JSON: 1. Convert the oEmbed response to an Open Graph response. - 2. If a thumbnail or image is in the oEmbed response: + 2. If an image URL is in the oEmbed response: 1. Downloads the URL and stores it into a file via the media storage provider and saves the local media metadata. 2. Generates thumbnails. @@ -144,7 +144,8 @@ class UrlPreviewer: If any additional requests (e.g. from oEmbed autodiscovery, step 5.3 or image thumbnailing, step 5.4 or 6.4) fails then the URL preview as a whole - does not fail. As much information as possible is returned. + does not fail. If any of them are blocked, then those additional requests + are skipped. As much information as possible is returned. The in-memory cache expires after 1 hour. @@ -166,8 +167,8 @@ class UrlPreviewer: self.client = SimpleHttpClient( hs, treq_args={"browser_like_redirects": True}, - ip_whitelist=hs.config.media.url_preview_ip_range_whitelist, - ip_blacklist=hs.config.media.url_preview_ip_range_blacklist, + ip_allowlist=hs.config.media.url_preview_ip_range_allowlist, + ip_blocklist=hs.config.media.url_preview_ip_range_blocklist, use_proxy=True, ) self.media_repo = media_repo @@ -185,7 +186,7 @@ class UrlPreviewer: or instance_running_jobs == hs.get_instance_name() ) - self.url_preview_url_blacklist = hs.config.media.url_preview_url_blacklist + self.url_preview_url_blocklist = hs.config.media.url_preview_url_blocklist self.url_preview_accept_language = hs.config.media.url_preview_accept_language # memory cache mapping urls to an ObservableDeferred returning @@ -203,48 +204,14 @@ class UrlPreviewer: ) async def preview(self, url: str, user: UserID, ts: int) -> bytes: - # XXX: we could move this into _do_preview if we wanted. - url_tuple = urlsplit(url) - for entry in self.url_preview_url_blacklist: - match = True - for attrib in entry: - pattern = entry[attrib] - value = getattr(url_tuple, attrib) - logger.debug( - "Matching attrib '%s' with value '%s' against pattern '%s'", - attrib, - value, - pattern, - ) - - if value is None: - match = False - continue - - # Some attributes might not be parsed as strings by urlsplit (such as the - # port, which is parsed as an int). Because we use match functions that - # expect strings, we want to make sure that's what we give them. - value_str = str(value) - - if pattern.startswith("^"): - if not re.match(pattern, value_str): - match = False - continue - else: - if not fnmatch.fnmatch(value_str, pattern): - match = False - continue - if match: - logger.warning("URL %s blocked by url_blacklist entry %s", url, entry) - raise SynapseError( - 403, "URL blocked by url pattern blacklist entry", Codes.UNKNOWN - ) - # the in-memory cache: - # * ensures that only one request is active at a time + # * ensures that only one request to a URL is active at a time # * takes load off the DB for the thundering herds # * also caches any failures (unlike the DB) so we don't keep - # requesting the same endpoint + # requesting the same endpoint + # + # Note that autodiscovered oEmbed URLs and pre-caching of images + # are not captured in the in-memory cache. observable = self._cache.get(url) @@ -283,7 +250,7 @@ class UrlPreviewer: og = og.encode("utf8") return og - # If this URL can be accessed via oEmbed, use that instead. + # If this URL can be accessed via an allowed oEmbed, use that instead. url_to_download = url oembed_url = self._oembed.get_oembed_url(url) if oembed_url: @@ -329,6 +296,7 @@ class UrlPreviewer: # defer to that. oembed_url = self._oembed.autodiscover_from_html(tree) og_from_oembed: JsonDict = {} + # Only download to the oEmbed URL if it is allowed. if oembed_url: try: oembed_info = await self._handle_url( @@ -411,6 +379,59 @@ class UrlPreviewer: return jsonog.encode("utf8") + def _is_url_blocked(self, url: str) -> bool: + """ + Check whether the URL is allowed to be previewed (according to the homeserver + configuration). + + Args: + url: The requested URL. + + Return: + True if the URL is blocked, False if it is allowed. + """ + url_tuple = urlsplit(url) + for entry in self.url_preview_url_blocklist: + match = True + # Iterate over each entry. If *all* attributes of that entry match + # the current URL, then reject it. + for attrib, pattern in entry.items(): + value = getattr(url_tuple, attrib) + logger.debug( + "Matching attrib '%s' with value '%s' against pattern '%s'", + attrib, + value, + pattern, + ) + + if value is None: + match = False + break + + # Some attributes might not be parsed as strings by urlsplit (such as the + # port, which is parsed as an int). Because we use match functions that + # expect strings, we want to make sure that's what we give them. + value_str = str(value) + + # Check the value against the pattern as either a regular expression or + # a glob. If it doesn't match, the entry doesn't match. + if pattern.startswith("^"): + if not re.match(pattern, value_str): + match = False + break + else: + if not fnmatch.fnmatch(value_str, pattern): + match = False + break + + # All fields matched, return true (the URL is blocked). + if match: + logger.warning("URL %s blocked by entry %s", url, entry) + return match + + # No matches were found, the URL is allowed. + return False + async def _download_url(self, url: str, output_stream: BinaryIO) -> DownloadResult: """ Fetches a remote URL and parses the headers. @@ -451,7 +472,7 @@ class UrlPreviewer: except DNSLookupError: # DNS lookup returned no results # Note: This will also be the case if one of the resolved IP - # addresses is blacklisted + # addresses is blocked. raise SynapseError( 502, "DNS resolution failure during URL preview generation", @@ -547,8 +568,16 @@ class UrlPreviewer: Returns: A MediaInfo object describing the fetched content. + + Raises: + SynapseError if the URL is blocked. """ + if self._is_url_blocked(url): + raise SynapseError( + 403, "URL blocked by url pattern blocklist entry", Codes.UNKNOWN + ) + # TODO: we should probably honour robots.txt... except in practice # we're most likely being explicitly triggered by a human rather than a # bot, so are we really a robot? @@ -624,7 +653,7 @@ class UrlPreviewer: return # The image URL from the HTML might be relative to the previewed page, - # convert it to an URL which can be requested directly. + # convert it to a URL which can be requested directly. url_parts = urlparse(image_url) if url_parts.scheme != "data": image_url = urljoin(media_info.uri, image_url) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 2c9d181acf..0e9f366cba 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -134,7 +134,7 @@ from synapse.util.caches.descriptors import CachedFunction, cached as _cached from synapse.util.frozenutils import freeze if TYPE_CHECKING: - from synapse.app.generic_worker import GenericWorkerSlavedStore + from synapse.app.generic_worker import GenericWorkerStore from synapse.server import HomeServer @@ -237,9 +237,7 @@ class ModuleApi: # TODO: Fix this type hint once the types for the data stores have been ironed # out. - self._store: Union[ - DataStore, "GenericWorkerSlavedStore" - ] = hs.get_datastores().main + self._store: Union[DataStore, "GenericWorkerStore"] = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() self._auth = hs.get_auth() self._auth_handler = auth_handler diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index e91ee05e99..50027680cb 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -143,7 +143,7 @@ class HttpPusher(Pusher): ) self.url = url - self.http_client = hs.get_proxied_blacklisted_http_client() + self.http_client = hs.get_proxied_blocklisted_http_client() self.data_minus_url = {} self.data_minus_url.update(self.data) del self.data_minus_url["url"] diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index dc7820f963..63cf24a14d 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -219,11 +219,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): with outgoing_gauge.track_inprogress(): if instance_name == local_instance_name: raise Exception("Trying to send HTTP request to self") - if instance_name in instance_map: - host = instance_map[instance_name].host - port = instance_map[instance_name].port - tls = instance_map[instance_name].tls - else: + if instance_name not in instance_map: raise Exception( "Instance %r not in 'instance_map' config" % (instance_name,) ) @@ -271,13 +267,11 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): "Unknown METHOD on %s replication endpoint" % (cls.NAME,) ) - # Here the protocol is hard coded to be http by default or https in case the replication - # port is set to have tls true. - scheme = "https" if tls else "http" - uri = "%s://%s:%s/_synapse/replication/%s/%s" % ( - scheme, - host, - port, + # Hard code a special scheme to show this only used for replication. The + # instance_name will be passed into the ReplicationEndpointFactory to + # determine connection details from the instance_map. + uri = "synapse-replication://%s/_synapse/replication/%s/%s" % ( + instance_name, cls.NAME, "/".join(url_args), ) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 200f667fdf..139f57cf86 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -60,7 +60,7 @@ _WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 5 class ReplicationDataHandler: """Handles incoming stream updates from replication. - This instance notifies the slave data store about updates. Can be subclassed + This instance notifies the data store about updates. Can be subclassed to handle updates in additional ways. """ @@ -91,7 +91,7 @@ class ReplicationDataHandler: ) -> None: """Called to handle a batch of replication data with a given stream token. - By default this just pokes the slave store. Can be overridden in subclasses to + By default, this just pokes the data store. Can be overridden in subclasses to handle more. Args: diff --git a/synapse/rest/admin/devices.py b/synapse/rest/admin/devices.py index 3b2f2d9abb..11ebed9bfd 100644 --- a/synapse/rest/admin/devices.py +++ b/synapse/rest/admin/devices.py @@ -137,6 +137,35 @@ class DevicesRestServlet(RestServlet): devices = await self.device_handler.get_devices_by_user(target_user.to_string()) return HTTPStatus.OK, {"devices": devices, "total": len(devices)} + async def on_POST( + self, request: SynapseRequest, user_id: str + ) -> Tuple[int, JsonDict]: + """Creates a new device for the user.""" + await assert_requester_is_admin(self.auth, request) + + target_user = UserID.from_string(user_id) + if not self.is_mine(target_user): + raise SynapseError( + HTTPStatus.BAD_REQUEST, "Can only create devices for local users" + ) + + u = await self.store.get_user_by_id(target_user.to_string()) + if u is None: + raise NotFoundError("Unknown user") + + body = parse_json_object_from_request(request) + device_id = body.get("device_id") + if not device_id: + raise SynapseError(HTTPStatus.BAD_REQUEST, "Missing device_id") + if not isinstance(device_id, str): + raise SynapseError(HTTPStatus.BAD_REQUEST, "device_id must be a string") + + await self.device_handler.check_device_registered( + user_id=user_id, device_id=device_id + ) + + return HTTPStatus.CREATED, {} + class DeleteDevicesRestServlet(RestServlet): """ diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py index 9bbab5e624..413edd8a4d 100644 --- a/synapse/rest/client/keys.py +++ b/synapse/rest/client/keys.py @@ -287,7 +287,7 @@ class OneTimeKeyServlet(RestServlet): self.e2e_keys_handler = hs.get_e2e_keys_handler() async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: - await self.auth.get_user_by_req(request, allow_guest=True) + requester = await self.auth.get_user_by_req(request, allow_guest=True) timeout = parse_integer(request, "timeout", 10 * 1000) body = parse_json_object_from_request(request) @@ -298,7 +298,7 @@ class OneTimeKeyServlet(RestServlet): query.setdefault(user_id, {})[device_id] = {algorithm: 1} result = await self.e2e_keys_handler.claim_one_time_keys( - query, timeout, always_include_fallback_keys=False + query, requester.user, timeout, always_include_fallback_keys=False ) return 200, result @@ -335,7 +335,7 @@ class UnstableOneTimeKeyServlet(RestServlet): self.e2e_keys_handler = hs.get_e2e_keys_handler() async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: - await self.auth.get_user_by_req(request, allow_guest=True) + requester = await self.auth.get_user_by_req(request, allow_guest=True) timeout = parse_integer(request, "timeout", 10 * 1000) body = parse_json_object_from_request(request) @@ -346,7 +346,7 @@ class UnstableOneTimeKeyServlet(RestServlet): query.setdefault(user_id, {})[device_id] = Counter(algorithms) result = await self.e2e_keys_handler.claim_one_time_keys( - query, timeout, always_include_fallback_keys=True + query, requester.user, timeout, always_include_fallback_keys=True ) return 200, result diff --git a/synapse/rest/client/login.py b/synapse/rest/client/login.py index a348720131..6ca61ffbd0 100644 --- a/synapse/rest/client/login.py +++ b/synapse/rest/client/login.py @@ -35,6 +35,7 @@ from synapse.api.errors import ( LoginError, NotApprovedError, SynapseError, + UserDeactivatedError, ) from synapse.api.ratelimiting import Ratelimiter from synapse.api.urls import CLIENT_API_PREFIX @@ -84,14 +85,10 @@ class LoginRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): super().__init__() self.hs = hs + self._main_store = hs.get_datastores().main # JWT configuration variables. self.jwt_enabled = hs.config.jwt.jwt_enabled - self.jwt_secret = hs.config.jwt.jwt_secret - self.jwt_subject_claim = hs.config.jwt.jwt_subject_claim - self.jwt_algorithm = hs.config.jwt.jwt_algorithm - self.jwt_issuer = hs.config.jwt.jwt_issuer - self.jwt_audiences = hs.config.jwt.jwt_audiences # SSO configuration. self.saml2_enabled = hs.config.saml2.saml2_enabled @@ -117,13 +114,13 @@ class LoginRestServlet(RestServlet): self._well_known_builder = WellKnownBuilder(hs) self._address_ratelimiter = Ratelimiter( - store=hs.get_datastores().main, + store=self._main_store, clock=hs.get_clock(), rate_hz=self.hs.config.ratelimiting.rc_login_address.per_second, burst_count=self.hs.config.ratelimiting.rc_login_address.burst_count, ) self._account_ratelimiter = Ratelimiter( - store=hs.get_datastores().main, + store=self._main_store, clock=hs.get_clock(), rate_hz=self.hs.config.ratelimiting.rc_login_account.per_second, burst_count=self.hs.config.ratelimiting.rc_login_account.burst_count, @@ -285,6 +282,9 @@ class LoginRestServlet(RestServlet): login_submission, ratelimit=appservice.is_rate_limited(), should_issue_refresh_token=should_issue_refresh_token, + # The user represented by an appservice's configured sender_localpart + # is not actually created in Synapse. + should_check_deactivated=qualified_user_id != appservice.sender, ) async def _do_other_login( @@ -331,6 +331,7 @@ class LoginRestServlet(RestServlet): auth_provider_id: Optional[str] = None, should_issue_refresh_token: bool = False, auth_provider_session_id: Optional[str] = None, + should_check_deactivated: bool = True, ) -> LoginResponse: """Called when we've successfully authed the user and now need to actually login them in (e.g. create devices). This gets called on @@ -350,6 +351,11 @@ class LoginRestServlet(RestServlet): should_issue_refresh_token: True if this login should issue a refresh token alongside the access token. auth_provider_session_id: The session ID got during login from the SSO IdP. + should_check_deactivated: True if the user should be checked for + deactivation status before logging in. + + This exists purely for appservice's configured sender_localpart + which doesn't have an associated user in the database. Returns: Dictionary of account information after successful login. @@ -369,6 +375,12 @@ class LoginRestServlet(RestServlet): ) user_id = canonical_uid + # If the account has been deactivated, do not proceed with the login. + if should_check_deactivated: + deactivated = await self._main_store.get_user_deactivated_status(user_id) + if deactivated: + raise UserDeactivatedError("This account has been deactivated") + device_id = login_submission.get("device_id") # If device_id is present, check that device_id is not longer than a reasonable 512 characters @@ -427,7 +439,7 @@ class LoginRestServlet(RestServlet): self, login_submission: JsonDict, should_issue_refresh_token: bool = False ) -> LoginResponse: """ - Handle the final stage of SSO login. + Handle token login. Args: login_submission: The JSON request body. @@ -452,72 +464,24 @@ class LoginRestServlet(RestServlet): async def _do_jwt_login( self, login_submission: JsonDict, should_issue_refresh_token: bool = False ) -> LoginResponse: - token = login_submission.get("token", None) - if token is None: - raise LoginError( - 403, "Token field for JWT is missing", errcode=Codes.FORBIDDEN - ) - - from authlib.jose import JsonWebToken, JWTClaims - from authlib.jose.errors import BadSignatureError, InvalidClaimError, JoseError - - jwt = JsonWebToken([self.jwt_algorithm]) - claim_options = {} - if self.jwt_issuer is not None: - claim_options["iss"] = {"value": self.jwt_issuer, "essential": True} - if self.jwt_audiences is not None: - claim_options["aud"] = {"values": self.jwt_audiences, "essential": True} - - try: - claims = jwt.decode( - token, - key=self.jwt_secret, - claims_cls=JWTClaims, - claims_options=claim_options, - ) - except BadSignatureError: - # We handle this case separately to provide a better error message - raise LoginError( - 403, - "JWT validation failed: Signature verification failed", - errcode=Codes.FORBIDDEN, - ) - except JoseError as e: - # A JWT error occurred, return some info back to the client. - raise LoginError( - 403, - "JWT validation failed: %s" % (str(e),), - errcode=Codes.FORBIDDEN, - ) - - try: - claims.validate(leeway=120) # allows 2 min of clock skew - - # Enforce the old behavior which is rolled out in productive - # servers: if the JWT contains an 'aud' claim but none is - # configured, the login attempt will fail - if claims.get("aud") is not None: - if self.jwt_audiences is None or len(self.jwt_audiences) == 0: - raise InvalidClaimError("aud") - except JoseError as e: - raise LoginError( - 403, - "JWT validation failed: %s" % (str(e),), - errcode=Codes.FORBIDDEN, - ) + """ + Handle the custom JWT login. - user = claims.get(self.jwt_subject_claim, None) - if user is None: - raise LoginError(403, "Invalid JWT", errcode=Codes.FORBIDDEN) + Args: + login_submission: The JSON request body. + should_issue_refresh_token: True if this login should issue + a refresh token alongside the access token. - user_id = UserID(user, self.hs.hostname).to_string() - result = await self._complete_login( + Returns: + The body of the JSON response. + """ + user_id = self.hs.get_jwt_handler().validate_login(login_submission) + return await self._complete_login( user_id, login_submission, create_non_existent_users=True, should_issue_refresh_token=should_issue_refresh_token, ) - return result def _get_auth_flow_dict_for_idp(idp: SsoIdentityProvider) -> JsonDict: diff --git a/synapse/rest/client/mutual_rooms.py b/synapse/rest/client/mutual_rooms.py index 38ef4e459f..c99445da30 100644 --- a/synapse/rest/client/mutual_rooms.py +++ b/synapse/rest/client/mutual_rooms.py @@ -12,13 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Tuple +from http import HTTPStatus +from typing import TYPE_CHECKING, Dict, List, Tuple from synapse.api.errors import Codes, SynapseError from synapse.http.server import HttpServer -from synapse.http.servlet import RestServlet +from synapse.http.servlet import RestServlet, parse_strings_from_args from synapse.http.site import SynapseRequest -from synapse.types import JsonDict, UserID +from synapse.types import JsonDict from ._base import client_patterns @@ -30,11 +31,11 @@ logger = logging.getLogger(__name__) class UserMutualRoomsServlet(RestServlet): """ - GET /uk.half-shot.msc2666/user/mutual_rooms/{user_id} HTTP/1.1 + GET /uk.half-shot.msc2666/user/mutual_rooms?user_id={user_id} HTTP/1.1 """ PATTERNS = client_patterns( - "/uk.half-shot.msc2666/user/mutual_rooms/(?P<user_id>[^/]*)", + "/uk.half-shot.msc2666/user/mutual_rooms$", releases=(), # This is an unstable feature ) @@ -43,17 +44,35 @@ class UserMutualRoomsServlet(RestServlet): self.auth = hs.get_auth() self.store = hs.get_datastores().main - async def on_GET( - self, request: SynapseRequest, user_id: str - ) -> Tuple[int, JsonDict]: - UserID.from_string(user_id) + async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: + # twisted.web.server.Request.args is incorrectly defined as Optional[Any] + args: Dict[bytes, List[bytes]] = request.args # type: ignore + + user_ids = parse_strings_from_args(args, "user_id", required=True) + + if len(user_ids) > 1: + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Duplicate user_id query parameter", + errcode=Codes.INVALID_PARAM, + ) + + # We don't do batching, so a batch token is illegal by default + if b"batch_token" in args: + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Unknown batch_token", + errcode=Codes.INVALID_PARAM, + ) + + user_id = user_ids[0] requester = await self.auth.get_user_by_req(request) if user_id == requester.user.to_string(): raise SynapseError( - code=400, - msg="You cannot request a list of shared rooms with yourself", - errcode=Codes.FORBIDDEN, + HTTPStatus.UNPROCESSABLE_ENTITY, + "You cannot request a list of shared rooms with yourself", + errcode=Codes.INVALID_PARAM, ) rooms = await self.store.get_mutual_rooms_between_users( diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index 58c5b07390..32df054f56 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -91,7 +91,7 @@ class VersionsRestServlet(RestServlet): # Implements additional endpoints as described in MSC2432 "org.matrix.msc2432": True, # Implements additional endpoints as described in MSC2666 - "uk.half-shot.msc2666.mutual_rooms": True, + "uk.half-shot.msc2666.query_mutual_rooms": True, # Whether new rooms will be set to encrypted or not (based on presets). "io.element.e2ee_forced.public": self.e2ee_forced_public, "io.element.e2ee_forced.private": self.e2ee_forced_private, diff --git a/synapse/server.py b/synapse/server.py index b307295789..f6e245569c 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -147,6 +147,7 @@ logger = logging.getLogger(__name__) if TYPE_CHECKING: from txredisapi import ConnectionHandler + from synapse.handlers.jwt import JwtHandler from synapse.handlers.oidc import OidcHandler from synapse.handlers.saml import SamlHandler @@ -453,15 +454,15 @@ class HomeServer(metaclass=abc.ABCMeta): return SimpleHttpClient(self, use_proxy=True) @cache_in_self - def get_proxied_blacklisted_http_client(self) -> SimpleHttpClient: + def get_proxied_blocklisted_http_client(self) -> SimpleHttpClient: """ - An HTTP client that uses configured HTTP(S) proxies and blacklists IPs - based on the IP range blacklist/whitelist. + An HTTP client that uses configured HTTP(S) proxies and blocks IPs + based on the configured IP ranges. """ return SimpleHttpClient( self, - ip_whitelist=self.config.server.ip_range_whitelist, - ip_blacklist=self.config.server.ip_range_blacklist, + ip_allowlist=self.config.server.ip_range_allowlist, + ip_blocklist=self.config.server.ip_range_blocklist, use_proxy=True, ) @@ -534,6 +535,12 @@ class HomeServer(metaclass=abc.ABCMeta): return SsoHandler(self) @cache_in_self + def get_jwt_handler(self) -> "JwtHandler": + from synapse.handlers.jwt import JwtHandler + + return JwtHandler(self) + + @cache_in_self def get_sync_handler(self) -> SyncHandler: return SyncHandler(self) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 6031095249..9bc0c3b7b9 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -45,6 +45,7 @@ from synapse.events.snapshot import ( UnpersistedEventContextBase, ) from synapse.logging.context import ContextResourceUsage +from synapse.logging.opentracing import tag_args, trace from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServlet from synapse.state import v1, v2 from synapse.storage.databases.main.events_worker import EventRedactBehaviour @@ -270,6 +271,8 @@ class StateHandler: state = await entry.get_state(self._state_storage_controller, StateFilter.all()) return await self.store.get_joined_hosts(room_id, state, entry) + @trace + @tag_args async def calculate_context_info( self, event: EventBase, @@ -465,6 +468,7 @@ class StateHandler: return await unpersisted_context.persist(event) + @trace @measure_func() async def resolve_state_groups_for_events( self, room_id: str, event_ids: Collection[str], await_full_state: bool = True diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index 9d7a8a792f..7089b0a1d8 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -16,7 +16,6 @@ from typing import ( TYPE_CHECKING, AbstractSet, Any, - Awaitable, Callable, Collection, Dict, @@ -67,6 +66,8 @@ class StateStorageController: """ self._partial_state_room_tracker.notify_un_partial_stated(room_id) + @trace + @tag_args async def get_state_group_delta( self, state_group: int ) -> Tuple[Optional[int], Optional[StateMap[str]]]: @@ -84,6 +85,8 @@ class StateStorageController: state_group_delta = await self.stores.state.get_state_group_delta(state_group) return state_group_delta.prev_group, state_group_delta.delta_ids + @trace + @tag_args async def get_state_groups_ids( self, _room_id: str, event_ids: Collection[str], await_full_state: bool = True ) -> Dict[int, MutableStateMap[str]]: @@ -114,6 +117,8 @@ class StateStorageController: return group_to_state + @trace + @tag_args async def get_state_ids_for_group( self, state_group: int, state_filter: Optional[StateFilter] = None ) -> StateMap[str]: @@ -130,6 +135,8 @@ class StateStorageController: return group_to_state[state_group] + @trace + @tag_args async def get_state_groups( self, room_id: str, event_ids: Collection[str] ) -> Dict[int, List[EventBase]]: @@ -165,9 +172,11 @@ class StateStorageController: for group, event_id_map in group_to_ids.items() } - def _get_state_groups_from_groups( + @trace + @tag_args + async def _get_state_groups_from_groups( self, groups: List[int], state_filter: StateFilter - ) -> Awaitable[Dict[int, StateMap[str]]]: + ) -> Dict[int, StateMap[str]]: """Returns the state groups for a given set of groups, filtering on types of state events. @@ -180,9 +189,12 @@ class StateStorageController: Dict of state group to state map. """ - return self.stores.state._get_state_groups_from_groups(groups, state_filter) + return await self.stores.state._get_state_groups_from_groups( + groups, state_filter + ) @trace + @tag_args async def get_state_for_events( self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None ) -> Dict[str, StateMap[EventBase]]: @@ -280,6 +292,8 @@ class StateStorageController: return {event: event_to_state[event] for event in event_ids} + @trace + @tag_args async def get_state_for_event( self, event_id: str, state_filter: Optional[StateFilter] = None ) -> StateMap[EventBase]: @@ -303,6 +317,7 @@ class StateStorageController: return state_map[event_id] @trace + @tag_args async def get_state_ids_for_event( self, event_id: str, @@ -333,9 +348,11 @@ class StateStorageController: ) return state_map[event_id] - def get_state_for_groups( + @trace + @tag_args + async def get_state_for_groups( self, groups: Iterable[int], state_filter: Optional[StateFilter] = None - ) -> Awaitable[Dict[int, MutableStateMap[str]]]: + ) -> Dict[int, MutableStateMap[str]]: """Gets the state at each of a list of state groups, optionally filtering by type/state_key @@ -347,7 +364,7 @@ class StateStorageController: Returns: Dict of state group to state map. """ - return self.stores.state._get_state_for_groups( + return await self.stores.state._get_state_for_groups( groups, state_filter or StateFilter.all() ) @@ -402,6 +419,8 @@ class StateStorageController: event_id, room_id, prev_group, delta_ids, current_state_ids ) + @trace + @tag_args @cancellable async def get_current_state_ids( self, @@ -442,6 +461,8 @@ class StateStorageController: room_id, on_invalidate=on_invalidate ) + @trace + @tag_args async def get_canonical_alias_for_room(self, room_id: str) -> Optional[str]: """Get canonical alias for room, if any @@ -466,6 +487,8 @@ class StateStorageController: return event.content.get("canonical_alias") + @trace + @tag_args async def get_current_state_deltas( self, prev_stream_id: int, max_stream_id: int ) -> Tuple[int, List[Dict[str, Any]]]: @@ -500,6 +523,7 @@ class StateStorageController: ) @trace + @tag_args async def get_current_state( self, room_id: str, state_filter: Optional[StateFilter] = None ) -> StateMap[EventBase]: @@ -516,6 +540,8 @@ class StateStorageController: return state_map + @trace + @tag_args async def get_current_state_event( self, room_id: str, event_type: str, state_key: str ) -> Optional[EventBase]: @@ -527,6 +553,8 @@ class StateStorageController: ) return state_map.get(key) + @trace + @tag_args async def get_current_hosts_in_room(self, room_id: str) -> AbstractSet[str]: """Get current hosts in room based on current state. @@ -538,6 +566,8 @@ class StateStorageController: return await self.stores.main.get_current_hosts_in_room(room_id) + @trace + @tag_args async def get_current_hosts_in_room_ordered(self, room_id: str) -> List[str]: """Get current hosts in room based on current state. @@ -553,6 +583,8 @@ class StateStorageController: return await self.stores.main.get_current_hosts_in_room_ordered(room_id) + @trace + @tag_args async def get_current_hosts_in_room_or_partial_state_approximation( self, room_id: str ) -> Collection[str]: @@ -582,6 +614,8 @@ class StateStorageController: return hosts + @trace + @tag_args async def get_users_in_room_with_profiles( self, room_id: str ) -> Mapping[str, ProfileInfo]: diff --git a/synapse/storage/controllers/stats.py b/synapse/storage/controllers/stats.py index 988e44c6af..2a03528fee 100644 --- a/synapse/storage/controllers/stats.py +++ b/synapse/storage/controllers/stats.py @@ -13,8 +13,7 @@ # limitations under the License. import logging -from collections import Counter -from typing import TYPE_CHECKING, Collection, List, Tuple +from typing import TYPE_CHECKING, Collection, Counter, List, Tuple from synapse.api.errors import SynapseError from synapse.storage.database import LoggingTransaction diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 313cf1a8d0..bdaa508dbe 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -565,9 +565,8 @@ class DatabasePool: # A set of tables that are not safe to use native upserts in. self._unsafe_to_upsert_tables = set(UNIQUE_INDEX_BACKGROUND_UPDATES.keys()) - # We add the user_directory_search table to the blacklist on SQLite - # because the existing search table does not have an index, making it - # unsafe to use native upserts. + # The user_directory_search table is unsafe to use native upserts + # on SQLite because the existing search table does not have an index. if isinstance(self.engine, Sqlite3Engine): self._unsafe_to_upsert_tables.add("user_directory_search") diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index a9843f6e17..8f7bdbc61a 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -85,13 +85,10 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) writers=hs.config.worker.writers.account_data, ) else: + # Multiple writers are not supported for SQLite. + # # We shouldn't be running in worker mode with SQLite, but its useful # to support it for unit tests. - # - # If this process is the writer than we need to use - # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets - # updated over replication. (Multiple writers are not supported for - # SQLite). self._account_data_id_gen = StreamIdGenerator( db_conn, hs.get_replication_notifier(), diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index bd07d20171..46fa0a73f9 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -274,11 +274,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore): async def invalidate_cache_and_stream( self, cache_name: str, keys: Tuple[Any, ...] ) -> None: - """Invalidates the cache and adds it to the cache stream so slaves + """Invalidates the cache and adds it to the cache stream so other workers will know to invalidate their caches. - This should only be used to invalidate caches where slaves won't - otherwise know from other replication streams that the cache should + This should only be used to invalidate caches where other workers won't + otherwise have known from other replication streams that the cache should be invalidated. """ cache_func = getattr(self, cache_name, None) @@ -297,11 +297,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore): cache_func: CachedFunction, keys: Tuple[Any, ...], ) -> None: - """Invalidates the cache and adds it to the cache stream so slaves + """Invalidates the cache and adds it to the cache stream so other workers will know to invalidate their caches. - This should only be used to invalidate caches where slaves won't - otherwise know from other replication streams that the cache should + This should only be used to invalidate caches where other workers won't + otherwise have known from other replication streams that the cache should be invalidated. """ txn.call_after(cache_func.invalidate, keys) @@ -310,7 +310,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): def _invalidate_all_cache_and_stream( self, txn: LoggingTransaction, cache_func: CachedFunction ) -> None: - """Invalidates the entire cache and adds it to the cache stream so slaves + """Invalidates the entire cache and adds it to the cache stream so other workers will know to invalidate their caches. """ diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 5503621ad6..a67fdb3c22 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -105,8 +105,6 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): is_writer=hs.config.worker.worker_app is None, ) - # Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a - # StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker). device_list_max = self._device_list_id_gen.get_current_token() device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict( db_conn, diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index ac19de183c..2681917d0b 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -46,7 +46,7 @@ from synapse.storage.database import ( from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.signatures import SignatureWorkerStore from synapse.storage.engines import PostgresEngine, Sqlite3Engine -from synapse.types import JsonDict +from synapse.types import JsonDict, StrCollection from synapse.util import json_encoder from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache @@ -1584,6 +1584,35 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause)) @trace + async def get_event_ids_with_failed_pull_attempts( + self, event_ids: StrCollection + ) -> Set[str]: + """ + Filter the given list of `event_ids` and return events which have any failed + pull attempts. + + Args: + event_ids: A list of events to filter down. + + Returns: + A filtered down list of `event_ids` that have previous failed pull attempts. + """ + + rows = await self.db_pool.simple_select_many_batch( + table="event_failed_pull_attempts", + column="event_id", + iterable=event_ids, + keyvalues={}, + retcols=("event_id",), + desc="get_event_ids_with_failed_pull_attempts", + ) + event_ids_with_failed_pull_attempts: Set[str] = { + row["event_id"] for row in rows + } + + return event_ids_with_failed_pull_attempts + + @trace async def get_event_ids_to_not_pull_from_backoff( self, room_id: str, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 0ff3fc7369..a39bc90974 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -213,13 +213,10 @@ class EventsWorkerStore(SQLBaseStore): writers=hs.config.worker.writers.events, ) else: + # Multiple writers are not supported for SQLite. + # # We shouldn't be running in worker mode with SQLite, but its useful # to support it for unit tests. - # - # If this process is the writer than we need to use - # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets - # updated over replication. (Multiple writers are not supported for - # SQLite). self._stream_id_gen = StreamIdGenerator( db_conn, hs.get_replication_notifier(), @@ -1976,12 +1973,6 @@ class EventsWorkerStore(SQLBaseStore): return rows, to_token, True - async def is_event_after(self, event_id1: str, event_id2: str) -> bool: - """Returns True if event_id1 is after event_id2 in the stream""" - to_1, so_1 = await self.get_event_ordering(event_id1) - to_2, so_2 = await self.get_event_ordering(event_id2) - return (to_1, so_1) > (to_2, so_2) - @cached(max_entries=5000) async def get_event_ordering(self, event_id: str) -> Tuple[int, int]: res = await self.db_pool.simple_select_one( diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py index 50516402f9..da31eb44dc 100644 --- a/synapse/storage/databases/main/filtering.py +++ b/synapse/storage/databases/main/filtering.py @@ -25,6 +25,7 @@ from synapse.storage.database import ( LoggingDatabaseConnection, LoggingTransaction, ) +from synapse.storage.engines import PostgresEngine from synapse.types import JsonDict, UserID from synapse.util.caches.descriptors import cached @@ -40,6 +41,8 @@ class FilteringWorkerStore(SQLBaseStore): hs: "HomeServer", ): super().__init__(database, db_conn, hs) + self.server_name: str = hs.hostname + self.database_engine = database.engine self.db_pool.updates.register_background_index_update( "full_users_filters_unique_idx", index_name="full_users_unique_idx", @@ -48,6 +51,98 @@ class FilteringWorkerStore(SQLBaseStore): unique=True, ) + self.db_pool.updates.register_background_update_handler( + "populate_full_user_id_user_filters", + self.populate_full_user_id_user_filters, + ) + + async def populate_full_user_id_user_filters( + self, progress: JsonDict, batch_size: int + ) -> int: + """ + Background update to populate the column `full_user_id` of the table + user_filters from entries in the column `user_local_part` of the same table + """ + + lower_bound_id = progress.get("lower_bound_id", "") + + def _get_last_id(txn: LoggingTransaction) -> Optional[str]: + sql = """ + SELECT user_id FROM user_filters + WHERE user_id > ? + ORDER BY user_id + LIMIT 1 OFFSET 50 + """ + txn.execute(sql, (lower_bound_id,)) + res = txn.fetchone() + if res: + upper_bound_id = res[0] + return upper_bound_id + else: + return None + + def _process_batch( + txn: LoggingTransaction, lower_bound_id: str, upper_bound_id: str + ) -> None: + sql = """ + UPDATE user_filters + SET full_user_id = '@' || user_id || ? + WHERE ? < user_id AND user_id <= ? AND full_user_id IS NULL + """ + txn.execute(sql, (f":{self.server_name}", lower_bound_id, upper_bound_id)) + + def _final_batch(txn: LoggingTransaction, lower_bound_id: str) -> None: + sql = """ + UPDATE user_filters + SET full_user_id = '@' || user_id || ? + WHERE ? < user_id AND full_user_id IS NULL + """ + txn.execute( + sql, + ( + f":{self.server_name}", + lower_bound_id, + ), + ) + + if isinstance(self.database_engine, PostgresEngine): + sql = """ + ALTER TABLE user_filters VALIDATE CONSTRAINT full_user_id_not_null + """ + txn.execute(sql) + + upper_bound_id = await self.db_pool.runInteraction( + "populate_full_user_id_user_filters", _get_last_id + ) + + if upper_bound_id is None: + await self.db_pool.runInteraction( + "populate_full_user_id_user_filters", _final_batch, lower_bound_id + ) + + await self.db_pool.updates._end_background_update( + "populate_full_user_id_user_filters" + ) + return 1 + + await self.db_pool.runInteraction( + "populate_full_user_id_user_filters", + _process_batch, + lower_bound_id, + upper_bound_id, + ) + + progress["lower_bound_id"] = upper_bound_id + + await self.db_pool.runInteraction( + "populate_full_user_id_user_filters", + self.db_pool.updates._background_update_progress_txn, + "populate_full_user_id_user_filters", + progress, + ) + + return 50 + @cached(num_args=2) async def get_user_filter( self, user_localpart: str, filter_id: Union[int, str] diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index 14294a0bb8..595e22982e 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -248,89 +248,6 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): (count,) = cast(Tuple[int], txn.fetchone()) return count - async def count_r30_users(self) -> Dict[str, int]: - """ - Counts the number of 30 day retained users, defined as:- - * Users who have created their accounts more than 30 days ago - * Where last seen at most 30 days ago - * Where account creation and last_seen are > 30 days apart - - Returns: - A mapping of counts globally as well as broken out by platform. - """ - - def _count_r30_users(txn: LoggingTransaction) -> Dict[str, int]: - thirty_days_in_secs = 86400 * 30 - now = int(self._clock.time()) - thirty_days_ago_in_secs = now - thirty_days_in_secs - - sql = """ - SELECT platform, COUNT(*) FROM ( - SELECT - users.name, platform, users.creation_ts * 1000, - MAX(uip.last_seen) - FROM users - INNER JOIN ( - SELECT - user_id, - last_seen, - CASE - WHEN user_agent LIKE '%%Android%%' THEN 'android' - WHEN user_agent LIKE '%%iOS%%' THEN 'ios' - WHEN user_agent LIKE '%%Electron%%' THEN 'electron' - WHEN user_agent LIKE '%%Mozilla%%' THEN 'web' - WHEN user_agent LIKE '%%Gecko%%' THEN 'web' - ELSE 'unknown' - END - AS platform - FROM user_ips - ) uip - ON users.name = uip.user_id - AND users.appservice_id is NULL - AND users.creation_ts < ? - AND uip.last_seen/1000 > ? - AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30 - GROUP BY users.name, platform, users.creation_ts - ) u GROUP BY platform - """ - - results = {} - txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs)) - - for row in txn: - if row[0] == "unknown": - pass - results[row[0]] = row[1] - - sql = """ - SELECT COUNT(*) FROM ( - SELECT users.name, users.creation_ts * 1000, - MAX(uip.last_seen) - FROM users - INNER JOIN ( - SELECT - user_id, - last_seen - FROM user_ips - ) uip - ON users.name = uip.user_id - AND appservice_id is NULL - AND users.creation_ts < ? - AND uip.last_seen/1000 > ? - AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30 - GROUP BY users.name, users.creation_ts - ) u - """ - - txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs)) - - (count,) = cast(Tuple[int], txn.fetchone()) - results["all"] = count - - return results - - return await self.db_pool.runInteraction("count_r30_users", _count_r30_users) - async def count_r30v2_users(self) -> Dict[str, int]: """ Counts the number of 30 day retained users, defined as users that: diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py index c4022d2427..65c92bef51 100644 --- a/synapse/storage/databases/main/profile.py +++ b/synapse/storage/databases/main/profile.py @@ -15,9 +15,14 @@ from typing import TYPE_CHECKING, Optional from synapse.api.errors import StoreError from synapse.storage._base import SQLBaseStore -from synapse.storage.database import DatabasePool, LoggingDatabaseConnection +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, +) from synapse.storage.databases.main.roommember import ProfileInfo -from synapse.types import UserID +from synapse.storage.engines import PostgresEngine +from synapse.types import JsonDict, UserID if TYPE_CHECKING: from synapse.server import HomeServer @@ -31,6 +36,8 @@ class ProfileWorkerStore(SQLBaseStore): hs: "HomeServer", ): super().__init__(database, db_conn, hs) + self.server_name: str = hs.hostname + self.database_engine = database.engine self.db_pool.updates.register_background_index_update( "profiles_full_user_id_key_idx", index_name="profiles_full_user_id_key", @@ -39,6 +46,97 @@ class ProfileWorkerStore(SQLBaseStore): unique=True, ) + self.db_pool.updates.register_background_update_handler( + "populate_full_user_id_profiles", self.populate_full_user_id_profiles + ) + + async def populate_full_user_id_profiles( + self, progress: JsonDict, batch_size: int + ) -> int: + """ + Background update to populate the column `full_user_id` of the table + profiles from entries in the column `user_local_part` of the same table + """ + + lower_bound_id = progress.get("lower_bound_id", "") + + def _get_last_id(txn: LoggingTransaction) -> Optional[str]: + sql = """ + SELECT user_id FROM profiles + WHERE user_id > ? + ORDER BY user_id + LIMIT 1 OFFSET 50 + """ + txn.execute(sql, (lower_bound_id,)) + res = txn.fetchone() + if res: + upper_bound_id = res[0] + return upper_bound_id + else: + return None + + def _process_batch( + txn: LoggingTransaction, lower_bound_id: str, upper_bound_id: str + ) -> None: + sql = """ + UPDATE profiles + SET full_user_id = '@' || user_id || ? + WHERE ? < user_id AND user_id <= ? AND full_user_id IS NULL + """ + txn.execute(sql, (f":{self.server_name}", lower_bound_id, upper_bound_id)) + + def _final_batch(txn: LoggingTransaction, lower_bound_id: str) -> None: + sql = """ + UPDATE profiles + SET full_user_id = '@' || user_id || ? + WHERE ? < user_id AND full_user_id IS NULL + """ + txn.execute( + sql, + ( + f":{self.server_name}", + lower_bound_id, + ), + ) + + if isinstance(self.database_engine, PostgresEngine): + sql = """ + ALTER TABLE profiles VALIDATE CONSTRAINT full_user_id_not_null + """ + txn.execute(sql) + + upper_bound_id = await self.db_pool.runInteraction( + "populate_full_user_id_profiles", _get_last_id + ) + + if upper_bound_id is None: + await self.db_pool.runInteraction( + "populate_full_user_id_profiles", _final_batch, lower_bound_id + ) + + await self.db_pool.updates._end_background_update( + "populate_full_user_id_profiles" + ) + return 1 + + await self.db_pool.runInteraction( + "populate_full_user_id_profiles", + _process_batch, + lower_bound_id, + upper_bound_id, + ) + + progress["lower_bound_id"] = upper_bound_id + + await self.db_pool.runInteraction( + "populate_full_user_id_profiles", + self.db_pool.updates._background_update_progress_txn, + "populate_full_user_id_profiles", + progress, + ) + + return 50 + async def get_profileinfo(self, user_localpart: str) -> ProfileInfo: try: profile = await self.db_pool.simple_select_one( diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 074942b167..5ee5c7ad9f 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -85,13 +85,10 @@ class ReceiptsWorkerStore(SQLBaseStore): else: self._can_write_to_receipts = True + # Multiple writers are not supported for SQLite. + # # We shouldn't be running in worker mode with SQLite, but its useful # to support it for unit tests. - # - # If this process is the writer than we need to use - # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets - # updated over replication. (Multiple writers are not supported for - # SQLite). self._receipts_id_gen = StreamIdGenerator( db_conn, hs.get_replication_notifier(), diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index e068f27a10..ae9c201b87 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1099,7 +1099,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): # `get_joined_hosts` is called with the "current" state group for the # room, and so consecutive calls will be for consecutive state groups # which point to the previous state group. - cache = await self._get_joined_hosts_cache(room_id) # type: ignore[misc] + cache = await self._get_joined_hosts_cache(room_id) # If the state group in the cache matches, we already have the data we need. if state_entry.state_group == cache.state_group: diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index b7d58978de..a0319575f0 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -17,6 +17,7 @@ import re import unicodedata from typing import ( TYPE_CHECKING, + Collection, Iterable, List, Mapping, @@ -45,7 +46,7 @@ from synapse.util.stringutils import non_null_str_or_none if TYPE_CHECKING: from synapse.server import HomeServer -from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules +from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, UserTypes from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, @@ -356,13 +357,30 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): Add all local users to the user directory. """ - def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]: - sql = "SELECT user_id FROM %s LIMIT %s" % ( - TEMP_TABLE + "_users", - str(batch_size), - ) - txn.execute(sql) - user_result = cast(List[Tuple[str]], txn.fetchall()) + def _populate_user_directory_process_users_txn( + txn: LoggingTransaction, + ) -> Optional[int]: + if self.database_engine.supports_returning: + # Note: we use an ORDER BY in the SELECT to force usage of an + # index. Otherwise, postgres does a sequential scan that is + # surprisingly slow (I think due to the fact it will read/skip + # over lots of already deleted rows). + sql = f""" + DELETE FROM {TEMP_TABLE + "_users"} + WHERE user_id IN ( + SELECT user_id FROM {TEMP_TABLE + "_users"} ORDER BY user_id LIMIT ? + ) + RETURNING user_id + """ + txn.execute(sql, (batch_size,)) + user_result = cast(List[Tuple[str]], txn.fetchall()) + else: + sql = "SELECT user_id FROM %s ORDER BY user_id LIMIT %s" % ( + TEMP_TABLE + "_users", + str(batch_size), + ) + txn.execute(sql) + user_result = cast(List[Tuple[str]], txn.fetchall()) if not user_result: return None @@ -378,85 +396,81 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): assert count_result is not None progress["remaining"] = count_result[0] - return users_to_work_on - - users_to_work_on = await self.db_pool.runInteraction( - "populate_user_directory_temp_read", _get_next_batch - ) + if not users_to_work_on: + return None - # No more users -- complete the transaction. - if not users_to_work_on: - await self.db_pool.updates._end_background_update( - "populate_user_directory_process_users" + logger.debug( + "Processing the next %d users of %d remaining", + len(users_to_work_on), + progress["remaining"], ) - return 1 - - logger.debug( - "Processing the next %d users of %d remaining" - % (len(users_to_work_on), progress["remaining"]) - ) - # First filter down to users we want to insert into the user directory. - users_to_insert = [ - user_id - for user_id in users_to_work_on - if await self.should_include_local_user_in_dir(user_id) - ] + # First filter down to users we want to insert into the user directory. + users_to_insert = self._filter_local_users_for_dir_txn( + txn, users_to_work_on + ) - # Next fetch their profiles. Note that the `user_id` here is the - # *localpart*, and that not all users have profiles. - profile_rows = await self.db_pool.simple_select_many_batch( - table="profiles", - column="user_id", - iterable=[get_localpart_from_id(u) for u in users_to_insert], - retcols=( - "user_id", - "displayname", - "avatar_url", - ), - keyvalues={}, - desc="populate_user_directory_process_users_get_profiles", - ) - profiles = { - f"@{row['user_id']}:{self.server_name}": _UserDirProfile( - f"@{row['user_id']}:{self.server_name}", - row["displayname"], - row["avatar_url"], + # Next fetch their profiles. Note that the `user_id` here is the + # *localpart*, and that not all users have profiles. + profile_rows = self.db_pool.simple_select_many_txn( + txn, + table="profiles", + column="user_id", + iterable=[get_localpart_from_id(u) for u in users_to_insert], + retcols=( + "user_id", + "displayname", + "avatar_url", + ), + keyvalues={}, ) - for row in profile_rows - } + profiles = { + f"@{row['user_id']}:{self.server_name}": _UserDirProfile( + f"@{row['user_id']}:{self.server_name}", + row["displayname"], + row["avatar_url"], + ) + for row in profile_rows + } - profiles_to_insert = [ - profiles.get(user_id) or _UserDirProfile(user_id) - for user_id in users_to_insert - ] + profiles_to_insert = [ + profiles.get(user_id) or _UserDirProfile(user_id) + for user_id in users_to_insert + ] + + # Actually insert the users with their profiles into the directory. + self._update_profiles_in_user_dir_txn(txn, profiles_to_insert) + + # We've finished processing the users. Delete it from the table, if + # we haven't already. + if not self.database_engine.supports_returning: + self.db_pool.simple_delete_many_txn( + txn, + table=TEMP_TABLE + "_users", + column="user_id", + values=users_to_work_on, + keyvalues={}, + ) - # Actually insert the users with their profiles into the directory. - await self.db_pool.runInteraction( - "populate_user_directory_process_users_insertion", - self._update_profiles_in_user_dir_txn, - profiles_to_insert, - ) + # Update the remaining counter. + progress["remaining"] -= len(users_to_work_on) + self.db_pool.updates._background_update_progress_txn( + txn, "populate_user_directory_process_users", progress + ) + return len(users_to_work_on) - # We've finished processing the users. Delete it from the table. - await self.db_pool.simple_delete_many( - table=TEMP_TABLE + "_users", - column="user_id", - iterable=users_to_work_on, - keyvalues={}, - desc="populate_user_directory_process_users_delete", + processed_count = await self.db_pool.runInteraction( + "populate_user_directory_temp", _populate_user_directory_process_users_txn ) - # Update the remaining counter. - progress["remaining"] -= len(users_to_work_on) - await self.db_pool.runInteraction( - "populate_user_directory", - self.db_pool.updates._background_update_progress_txn, - "populate_user_directory_process_users", - progress, - ) + # No more users -- complete the transaction. + if not processed_count: + await self.db_pool.updates._end_background_update( + "populate_user_directory_process_users" + ) + return 1 - return len(users_to_work_on) + return processed_count async def should_include_local_user_in_dir(self, user: str) -> bool: """Certain classes of local user are omitted from the user directory. @@ -494,6 +508,30 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): return True + def _filter_local_users_for_dir_txn( + self, txn: LoggingTransaction, users: Collection[str] + ) -> Collection[str]: + """A batched version of `should_include_local_user_in_dir`""" + users = [ + user + for user in users + if self.get_app_service_by_user_id(user) is None # type: ignore[attr-defined] + and not self.get_if_app_services_interested_in_user(user) # type: ignore[attr-defined] + ] + + rows = self.db_pool.simple_select_many_txn( + txn, + table="users", + column="name", + iterable=users, + keyvalues={ + "deactivated": 0, + }, + retcols=("name", "user_type"), + ) + + return [row["name"] for row in rows if row["user_type"] != UserTypes.SUPPORT] + async def is_room_world_readable_or_publicly_joinable(self, room_id: str) -> bool: """Check if the room is either world_readable or publically joinable""" diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index 097dea5182..86eb1a8a08 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -15,6 +15,7 @@ import logging from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union +from synapse.logging.opentracing import tag_args, trace from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -40,6 +41,8 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): updates. """ + @trace + @tag_args def _count_state_group_hops_txn( self, txn: LoggingTransaction, state_group: int ) -> int: @@ -83,6 +86,8 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): return count + @trace + @tag_args def _get_state_groups_from_groups_txn( self, txn: LoggingTransaction, diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index 29ff64e876..6984d11352 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -20,6 +20,7 @@ import attr from synapse.api.constants import EventTypes from synapse.events import EventBase from synapse.events.snapshot import UnpersistedEventContext, UnpersistedEventContextBase +from synapse.logging.opentracing import tag_args, trace from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -159,6 +160,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): "get_state_group_delta", _get_state_group_delta_txn ) + @trace + @tag_args @cancellable async def _get_state_groups_from_groups( self, groups: List[int], state_filter: StateFilter @@ -187,6 +190,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): return results + @trace + @tag_args def _get_state_for_group_using_cache( self, cache: DictionaryCache[int, StateKey, str], @@ -239,6 +244,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): return state_filter.filter_state(state_dict_ids), not missing_types + @trace + @tag_args @cancellable async def _get_state_for_groups( self, groups: Iterable[int], state_filter: Optional[StateFilter] = None @@ -305,6 +312,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): return state + @trace + @tag_args def _get_state_for_groups_using_cache( self, groups: Iterable[int], @@ -403,6 +412,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): fetched_keys=non_member_types, ) + @trace + @tag_args async def store_state_deltas_for_batched( self, events_and_context: List[Tuple[EventBase, UnpersistedEventContextBase]], @@ -520,6 +531,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): prev_group, ) + @trace + @tag_args async def store_state_group( self, event_id: str, @@ -772,6 +785,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): ((sg,) for sg in state_groups_to_delete), ) + @trace + @tag_args async def get_previous_state_groups( self, state_groups: Iterable[int] ) -> Dict[int, int]: diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 1672976209..df2cc31ca6 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -SCHEMA_VERSION = 76 # remember to update the list below when updating +SCHEMA_VERSION = 77 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -100,13 +100,19 @@ Changes in SCHEMA_VERSION = 75: Changes in SCHEMA_VERSION = 76: - Adds a full_user_id column to tables profiles and user_filters. + +Changes in SCHEMA_VERSION = 77 + - (Postgres) Add NOT VALID CHECK (full_user_id IS NOT NULL) to tables profiles and user_filters """ SCHEMA_COMPAT_VERSION = ( # Queries against `event_stream_ordering` columns in membership tables must # be disambiguated. - 74 + # + # insertions to the column `full_user_id` of tables profiles and user_filters can no + # longer be null + 76 ) """Limit on how far the synapse codebase can be rolled back without breaking db compat diff --git a/synapse/storage/schema/main/delta/34/cache_stream.py b/synapse/storage/schema/main/delta/34/cache_stream.py index 682c86da1a..882f9b893b 100644 --- a/synapse/storage/schema/main/delta/34/cache_stream.py +++ b/synapse/storage/schema/main/delta/34/cache_stream.py @@ -21,7 +21,7 @@ from synapse.storage.prepare_database import get_statements logger = logging.getLogger(__name__) -# This stream is used to notify replication slaves that some caches have +# This stream is used to notify workers over replication that some caches have # been invalidated that they cannot infer from the other streams. CREATE_TABLE = """ CREATE TABLE cache_invalidation_stream ( diff --git a/synapse/storage/schema/main/delta/77/01_add_profiles_not_valid_check.sql.postgres b/synapse/storage/schema/main/delta/77/01_add_profiles_not_valid_check.sql.postgres new file mode 100644 index 0000000000..3eb226c648 --- /dev/null +++ b/synapse/storage/schema/main/delta/77/01_add_profiles_not_valid_check.sql.postgres @@ -0,0 +1,16 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE profiles ADD CONSTRAINT full_user_id_not_null CHECK (full_user_id IS NOT NULL) NOT VALID; \ No newline at end of file diff --git a/synapse/storage/schema/main/delta/77/02_add_user_filters_not_valid_check.sql.postgres b/synapse/storage/schema/main/delta/77/02_add_user_filters_not_valid_check.sql.postgres new file mode 100644 index 0000000000..ba037daf47 --- /dev/null +++ b/synapse/storage/schema/main/delta/77/02_add_user_filters_not_valid_check.sql.postgres @@ -0,0 +1,16 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE user_filters ADD CONSTRAINT full_user_id_not_null CHECK (full_user_id IS NOT NULL) NOT VALID; \ No newline at end of file diff --git a/synapse/storage/schema/main/delta/77/03bg_populate_full_user_id_profiles.sql b/synapse/storage/schema/main/delta/77/03bg_populate_full_user_id_profiles.sql new file mode 100644 index 0000000000..12101ab914 --- /dev/null +++ b/synapse/storage/schema/main/delta/77/03bg_populate_full_user_id_profiles.sql @@ -0,0 +1,16 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES (7703, 'populate_full_user_id_profiles', '{}'); \ No newline at end of file diff --git a/synapse/storage/schema/main/delta/77/04bg_populate_full_user_id_user_filters.sql b/synapse/storage/schema/main/delta/77/04bg_populate_full_user_id_user_filters.sql new file mode 100644 index 0000000000..1f4d683cac --- /dev/null +++ b/synapse/storage/schema/main/delta/77/04bg_populate_full_user_id_user_filters.sql @@ -0,0 +1,16 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES (7704, 'populate_full_user_id_user_filters', '{}'); \ No newline at end of file diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 325219656a..42baf8ac6b 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -84,7 +84,15 @@ JsonSerializable = object # Collection[str] that does not include str itself; str being a Sequence[str] # is very misleading and results in bugs. +# +# StrCollection is an unordered collection of strings. If ordering is important, +# StrSequence can be used instead. StrCollection = Union[Tuple[str, ...], List[str], AbstractSet[str]] +# Sequence[str] that does not include str itself; str being a Sequence[str] +# is very misleading and results in bugs. +# +# Unlike StrCollection, StrSequence is an ordered collection of strings. +StrSequence = Union[Tuple[str, ...], List[str]] # Note that this seems to require inheriting *directly* from Interface in order diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 81df71a0c5..8514a75a1c 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -220,7 +220,9 @@ class DeferredCacheDescriptor(_CacheDescriptorBase): self.iterable = iterable self.prune_unread_entries = prune_unread_entries - def __get__(self, obj: Optional[Any], owner: Optional[Type]) -> Callable[..., Any]: + def __get__( + self, obj: Optional[Any], owner: Optional[Type] + ) -> Callable[..., "defer.Deferred[Any]"]: cache: DeferredCache[CacheKey, Any] = DeferredCache( name=self.name, max_entries=self.max_entries, @@ -232,7 +234,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase): get_cache_key = self.cache_key_builder @functools.wraps(self.orig) - def _wrapped(*args: Any, **kwargs: Any) -> Any: + def _wrapped(*args: Any, **kwargs: Any) -> "defer.Deferred[Any]": # If we're passed a cache_context then we'll want to call its invalidate() # whenever we are invalidated invalidate_callback = kwargs.pop("on_invalidate", None) diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py index 4938ddf703..a0efb96d3b 100644 --- a/synapse/util/iterutils.py +++ b/synapse/util/iterutils.py @@ -15,11 +15,13 @@ import heapq from itertools import islice from typing import ( + Callable, Collection, Dict, Generator, Iterable, Iterator, + List, Mapping, Set, Sized, @@ -71,6 +73,31 @@ def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]: return (iseq[i : i + maxlen] for i in range(0, len(iseq), maxlen)) +def partition( + iterable: Iterable[T], predicate: Callable[[T], bool] +) -> Tuple[List[T], List[T]]: + """ + Separate a given iterable into two lists based on the result of a predicate function. + + Args: + iterable: the iterable to partition (separate) + predicate: a function that takes an item from the iterable and returns a boolean + + Returns: + A tuple of two lists, the first containing all items for which the predicate + returned True, the second containing all items for which the predicate returned + False + """ + true_results = [] + false_results = [] + for item in iterable: + if predicate(item): + true_results.append(item) + else: + false_results.append(item) + return true_results, false_results + + def sorted_topologically( nodes: Iterable[T], graph: Mapping[T, Collection[T]], diff --git a/synapse/util/module_loader.py b/synapse/util/module_loader.py index 5a638c6e9a..e3a54df48b 100644 --- a/synapse/util/module_loader.py +++ b/synapse/util/module_loader.py @@ -14,17 +14,17 @@ import importlib import importlib.util -import itertools from types import ModuleType -from typing import Any, Iterable, Tuple, Type +from typing import Any, Tuple, Type import jsonschema from synapse.config._base import ConfigError from synapse.config._util import json_error_to_config_error +from synapse.types import StrSequence -def load_module(provider: dict, config_path: Iterable[str]) -> Tuple[Type, Any]: +def load_module(provider: dict, config_path: StrSequence) -> Tuple[Type, Any]: """Loads a synapse module with its config Args: @@ -39,9 +39,7 @@ def load_module(provider: dict, config_path: Iterable[str]) -> Tuple[Type, Any]: modulename = provider.get("module") if not isinstance(modulename, str): - raise ConfigError( - "expected a string", path=itertools.chain(config_path, ("module",)) - ) + raise ConfigError("expected a string", path=tuple(config_path) + ("module",)) # We need to import the module, and then pick the class out of # that, so we split based on the last dot. @@ -55,19 +53,17 @@ def load_module(provider: dict, config_path: Iterable[str]) -> Tuple[Type, Any]: try: provider_config = provider_class.parse_config(module_config) except jsonschema.ValidationError as e: - raise json_error_to_config_error( - e, itertools.chain(config_path, ("config",)) - ) + raise json_error_to_config_error(e, tuple(config_path) + ("config",)) except ConfigError as e: raise _wrap_config_error( "Failed to parse config for module %r" % (modulename,), - prefix=itertools.chain(config_path, ("config",)), + prefix=tuple(config_path) + ("config",), e=e, ) except Exception as e: raise ConfigError( "Failed to parse config for module %r" % (modulename,), - path=itertools.chain(config_path, ("config",)), + path=tuple(config_path) + ("config",), ) from e else: provider_config = module_config @@ -92,9 +88,7 @@ def load_python_module(location: str) -> ModuleType: return mod -def _wrap_config_error( - msg: str, prefix: Iterable[str], e: ConfigError -) -> "ConfigError": +def _wrap_config_error(msg: str, prefix: StrSequence, e: ConfigError) -> "ConfigError": """Wrap a relative ConfigError with a new path This is useful when we have a ConfigError with a relative path due to a problem @@ -102,7 +96,7 @@ def _wrap_config_error( """ path = prefix if e.path: - path = itertools.chain(prefix, e.path) + path = tuple(prefix) + tuple(e.path) e1 = ConfigError(msg, path) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index f262bf95a0..2ad55ac13e 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -25,10 +25,12 @@ from typing import ( Iterator, List, Mapping, + MutableSet, Optional, Set, Tuple, ) +from weakref import WeakSet from prometheus_client.core import Counter from typing_extensions import ContextManager @@ -86,7 +88,9 @@ queue_wait_timer = Histogram( ) -_rate_limiter_instances: Set["FederationRateLimiter"] = set() +# This must be a `WeakSet`, otherwise we indirectly hold on to entire `HomeServer`s +# during trial test runs and leak a lot of memory. +_rate_limiter_instances: MutableSet["FederationRateLimiter"] = WeakSet() # Protects the _rate_limiter_instances set from concurrent access _rate_limiter_instances_lock = threading.Lock() |