diff options
Diffstat (limited to 'synapse')
41 files changed, 974 insertions, 414 deletions
diff --git a/synapse/api/errors.py b/synapse/api/errors.py index af894243f8..3546aaf7c3 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -217,6 +217,13 @@ class InvalidAPICallError(SynapseError): super().__init__(HTTPStatus.BAD_REQUEST, msg, Codes.BAD_JSON) +class InvalidProxyCredentialsError(SynapseError): + """Error raised when the proxy credentials are invalid.""" + + def __init__(self, msg: str, errcode: str = Codes.UNKNOWN): + super().__init__(401, msg, errcode) + + class ProxiedRequestError(SynapseError): """An error from a general matrix endpoint, eg. from a proxied Matrix API call. diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py index 25c105a4c8..e7662d5b99 100644 --- a/synapse/api/room_versions.py +++ b/synapse/api/room_versions.py @@ -78,36 +78,29 @@ class RoomVersion: # MSC2209: Check 'notifications' key while verifying # m.room.power_levels auth rules. limit_notifications_power_levels: bool - # MSC2175: No longer include the creator in m.room.create events. - msc2175_implicit_room_creator: bool - # MSC2174/MSC2176: Apply updated redaction rules algorithm, move redacts to - # content property. - msc2176_redaction_rules: bool - # MSC3083: Support the 'restricted' join_rule. - msc3083_join_rules: bool - # MSC3375: Support for the proper redaction rules for MSC3083. This mustn't - # be enabled if MSC3083 is not. - msc3375_redaction_rules: bool - # MSC2403: Allows join_rules to be set to 'knock', changes auth rules to allow sending - # m.room.membership event with membership 'knock'. - msc2403_knocking: bool + # No longer include the creator in m.room.create events. + implicit_room_creator: bool + # Apply updated redaction rules algorithm from room version 11. + updated_redaction_rules: bool + # Support the 'restricted' join rule. + restricted_join_rule: bool + # Support for the proper redaction rules for the restricted join rule. This requires + # restricted_join_rule to be enabled. + restricted_join_rule_fix: bool + # Support the 'knock' join rule. + knock_join_rule: bool # MSC3389: Protect relation information from redaction. msc3389_relation_redactions: bool - # MSC3787: Adds support for a `knock_restricted` join rule, mixing concepts of - # knocks and restricted join rules into the same join condition. - msc3787_knock_restricted_join_rule: bool - # MSC3667: Enforce integer power levels - msc3667_int_only_power_levels: bool - # MSC3821: Do not redact the third_party_invite content field for membership events. - msc3821_redaction_rules: bool + # Support the 'knock_restricted' join rule. + knock_restricted_join_rule: bool + # Enforce integer power levels + enforce_int_power_levels: bool # MSC3931: Adds a push rule condition for "room version feature flags", making # some push rules room version dependent. Note that adding a flag to this list # is not enough to mark it "supported": the push rule evaluator also needs to # support the flag. Unknown flags are ignored by the evaluator, making conditions # fail if used. msc3931_push_features: Tuple[str, ...] # values from PushRuleRoomFlag - # MSC3989: Redact the origin field. - msc3989_redaction_rules: bool class RoomVersions: @@ -120,17 +113,15 @@ class RoomVersions: special_case_aliases_auth=True, strict_canonicaljson=False, limit_notifications_power_levels=False, - msc2175_implicit_room_creator=False, - msc2176_redaction_rules=False, - msc3083_join_rules=False, - msc3375_redaction_rules=False, - msc2403_knocking=False, + implicit_room_creator=False, + updated_redaction_rules=False, + restricted_join_rule=False, + restricted_join_rule_fix=False, + knock_join_rule=False, msc3389_relation_redactions=False, - msc3787_knock_restricted_join_rule=False, - msc3667_int_only_power_levels=False, - msc3821_redaction_rules=False, + knock_restricted_join_rule=False, + enforce_int_power_levels=False, msc3931_push_features=(), - msc3989_redaction_rules=False, ) V2 = RoomVersion( "2", @@ -141,17 +132,15 @@ class RoomVersions: special_case_aliases_auth=True, strict_canonicaljson=False, limit_notifications_power_levels=False, - msc2175_implicit_room_creator=False, - msc2176_redaction_rules=False, - msc3083_join_rules=False, - msc3375_redaction_rules=False, - msc2403_knocking=False, + implicit_room_creator=False, + updated_redaction_rules=False, + restricted_join_rule=False, + restricted_join_rule_fix=False, + knock_join_rule=False, msc3389_relation_redactions=False, - msc3787_knock_restricted_join_rule=False, - msc3667_int_only_power_levels=False, - msc3821_redaction_rules=False, + knock_restricted_join_rule=False, + enforce_int_power_levels=False, msc3931_push_features=(), - msc3989_redaction_rules=False, ) V3 = RoomVersion( "3", @@ -162,17 +151,15 @@ class RoomVersions: special_case_aliases_auth=True, strict_canonicaljson=False, limit_notifications_power_levels=False, - msc2175_implicit_room_creator=False, - msc2176_redaction_rules=False, - msc3083_join_rules=False, - msc3375_redaction_rules=False, - msc2403_knocking=False, + implicit_room_creator=False, + updated_redaction_rules=False, + restricted_join_rule=False, + restricted_join_rule_fix=False, + knock_join_rule=False, msc3389_relation_redactions=False, - msc3787_knock_restricted_join_rule=False, - msc3667_int_only_power_levels=False, - msc3821_redaction_rules=False, + knock_restricted_join_rule=False, + enforce_int_power_levels=False, msc3931_push_features=(), - msc3989_redaction_rules=False, ) V4 = RoomVersion( "4", @@ -183,17 +170,15 @@ class RoomVersions: special_case_aliases_auth=True, strict_canonicaljson=False, limit_notifications_power_levels=False, - msc2175_implicit_room_creator=False, - msc2176_redaction_rules=False, - msc3083_join_rules=False, - msc3375_redaction_rules=False, - msc2403_knocking=False, + implicit_room_creator=False, + updated_redaction_rules=False, + restricted_join_rule=False, + restricted_join_rule_fix=False, + knock_join_rule=False, msc3389_relation_redactions=False, - msc3787_knock_restricted_join_rule=False, - msc3667_int_only_power_levels=False, - msc3821_redaction_rules=False, + knock_restricted_join_rule=False, + enforce_int_power_levels=False, msc3931_push_features=(), - msc3989_redaction_rules=False, ) V5 = RoomVersion( "5", @@ -204,17 +189,15 @@ class RoomVersions: special_case_aliases_auth=True, strict_canonicaljson=False, limit_notifications_power_levels=False, - msc2175_implicit_room_creator=False, - msc2176_redaction_rules=False, - msc3083_join_rules=False, - msc3375_redaction_rules=False, - msc2403_knocking=False, + implicit_room_creator=False, + updated_redaction_rules=False, + restricted_join_rule=False, + restricted_join_rule_fix=False, + knock_join_rule=False, msc3389_relation_redactions=False, - msc3787_knock_restricted_join_rule=False, - msc3667_int_only_power_levels=False, - msc3821_redaction_rules=False, + knock_restricted_join_rule=False, + enforce_int_power_levels=False, msc3931_push_features=(), - msc3989_redaction_rules=False, ) V6 = RoomVersion( "6", @@ -225,38 +208,15 @@ class RoomVersions: special_case_aliases_auth=False, strict_canonicaljson=True, limit_notifications_power_levels=True, - msc2175_implicit_room_creator=False, - msc2176_redaction_rules=False, - msc3083_join_rules=False, - msc3375_redaction_rules=False, - msc2403_knocking=False, + implicit_room_creator=False, + updated_redaction_rules=False, + restricted_join_rule=False, + restricted_join_rule_fix=False, + knock_join_rule=False, msc3389_relation_redactions=False, - msc3787_knock_restricted_join_rule=False, - msc3667_int_only_power_levels=False, - msc3821_redaction_rules=False, + knock_restricted_join_rule=False, + enforce_int_power_levels=False, msc3931_push_features=(), - msc3989_redaction_rules=False, - ) - MSC2176 = RoomVersion( - "org.matrix.msc2176", - RoomDisposition.UNSTABLE, - EventFormatVersions.ROOM_V4_PLUS, - StateResolutionVersions.V2, - enforce_key_validity=True, - special_case_aliases_auth=False, - strict_canonicaljson=True, - limit_notifications_power_levels=True, - msc2175_implicit_room_creator=False, - msc2176_redaction_rules=True, - msc3083_join_rules=False, - msc3375_redaction_rules=False, - msc2403_knocking=False, - msc3389_relation_redactions=False, - msc3787_knock_restricted_join_rule=False, - msc3667_int_only_power_levels=False, - msc3821_redaction_rules=False, - msc3931_push_features=(), - msc3989_redaction_rules=False, ) V7 = RoomVersion( "7", @@ -267,17 +227,15 @@ class RoomVersions: special_case_aliases_auth=False, strict_canonicaljson=True, limit_notifications_power_levels=True, - msc2175_implicit_room_creator=False, - msc2176_redaction_rules=False, - msc3083_join_rules=False, - msc3375_redaction_rules=False, - msc2403_knocking=True, + implicit_room_creator=False, + updated_redaction_rules=False, + restricted_join_rule=False, + restricted_join_rule_fix=False, + knock_join_rule=True, msc3389_relation_redactions=False, - msc3787_knock_restricted_join_rule=False, - msc3667_int_only_power_levels=False, - msc3821_redaction_rules=False, + knock_restricted_join_rule=False, + enforce_int_power_levels=False, msc3931_push_features=(), - msc3989_redaction_rules=False, ) V8 = RoomVersion( "8", @@ -288,17 +246,15 @@ class RoomVersions: special_case_aliases_auth=False, strict_canonicaljson=True, limit_notifications_power_levels=True, - msc2175_implicit_room_creator=False, - msc2176_redaction_rules=False, - msc3083_join_rules=True, - msc3375_redaction_rules=False, - msc2403_knocking=True, + implicit_room_creator=False, + updated_redaction_rules=False, + restricted_join_rule=True, + restricted_join_rule_fix=False, + knock_join_rule=True, msc3389_relation_redactions=False, - msc3787_knock_restricted_join_rule=False, - msc3667_int_only_power_levels=False, - msc3821_redaction_rules=False, + knock_restricted_join_rule=False, + enforce_int_power_levels=False, msc3931_push_features=(), - msc3989_redaction_rules=False, ) V9 = RoomVersion( "9", @@ -309,59 +265,15 @@ class RoomVersions: special_case_aliases_auth=False, strict_canonicaljson=True, limit_notifications_power_levels=True, - msc2175_implicit_room_creator=False, - msc2176_redaction_rules=False, - msc3083_join_rules=True, - msc3375_redaction_rules=True, - msc2403_knocking=True, - msc3389_relation_redactions=False, - msc3787_knock_restricted_join_rule=False, - msc3667_int_only_power_levels=False, - msc3821_redaction_rules=False, - msc3931_push_features=(), - msc3989_redaction_rules=False, - ) - MSC3787 = RoomVersion( - "org.matrix.msc3787", - RoomDisposition.UNSTABLE, - EventFormatVersions.ROOM_V4_PLUS, - StateResolutionVersions.V2, - enforce_key_validity=True, - special_case_aliases_auth=False, - strict_canonicaljson=True, - limit_notifications_power_levels=True, - msc2175_implicit_room_creator=False, - msc2176_redaction_rules=False, - msc3083_join_rules=True, - msc3375_redaction_rules=True, - msc2403_knocking=True, - msc3389_relation_redactions=False, - msc3787_knock_restricted_join_rule=True, - msc3667_int_only_power_levels=False, - msc3821_redaction_rules=False, - msc3931_push_features=(), - msc3989_redaction_rules=False, - ) - MSC3821 = RoomVersion( - "org.matrix.msc3821.opt1", - RoomDisposition.UNSTABLE, - EventFormatVersions.ROOM_V4_PLUS, - StateResolutionVersions.V2, - enforce_key_validity=True, - special_case_aliases_auth=False, - strict_canonicaljson=True, - limit_notifications_power_levels=True, - msc2175_implicit_room_creator=False, - msc2176_redaction_rules=False, - msc3083_join_rules=True, - msc3375_redaction_rules=True, - msc2403_knocking=True, + implicit_room_creator=False, + updated_redaction_rules=False, + restricted_join_rule=True, + restricted_join_rule_fix=True, + knock_join_rule=True, msc3389_relation_redactions=False, - msc3787_knock_restricted_join_rule=False, - msc3667_int_only_power_levels=False, - msc3821_redaction_rules=True, + knock_restricted_join_rule=False, + enforce_int_power_levels=False, msc3931_push_features=(), - msc3989_redaction_rules=False, ) V10 = RoomVersion( "10", @@ -372,17 +284,15 @@ class RoomVersions: special_case_aliases_auth=False, strict_canonicaljson=True, limit_notifications_power_levels=True, - msc2175_implicit_room_creator=False, - msc2176_redaction_rules=False, - msc3083_join_rules=True, - msc3375_redaction_rules=True, - msc2403_knocking=True, + implicit_room_creator=False, + updated_redaction_rules=False, + restricted_join_rule=True, + restricted_join_rule_fix=True, + knock_join_rule=True, msc3389_relation_redactions=False, - msc3787_knock_restricted_join_rule=True, - msc3667_int_only_power_levels=True, - msc3821_redaction_rules=False, + knock_restricted_join_rule=True, + enforce_int_power_levels=True, msc3931_push_features=(), - msc3989_redaction_rules=False, ) MSC1767v10 = RoomVersion( # MSC1767 (Extensible Events) based on room version "10" @@ -394,60 +304,34 @@ class RoomVersions: special_case_aliases_auth=False, strict_canonicaljson=True, limit_notifications_power_levels=True, - msc2175_implicit_room_creator=False, - msc2176_redaction_rules=False, - msc3083_join_rules=True, - msc3375_redaction_rules=True, - msc2403_knocking=True, + implicit_room_creator=False, + updated_redaction_rules=False, + restricted_join_rule=True, + restricted_join_rule_fix=True, + knock_join_rule=True, msc3389_relation_redactions=False, - msc3787_knock_restricted_join_rule=True, - msc3667_int_only_power_levels=True, - msc3821_redaction_rules=False, + knock_restricted_join_rule=True, + enforce_int_power_levels=True, msc3931_push_features=(PushRuleRoomFlag.EXTENSIBLE_EVENTS,), - msc3989_redaction_rules=False, ) - MSC3989 = RoomVersion( - "org.matrix.msc3989", - RoomDisposition.UNSTABLE, - EventFormatVersions.ROOM_V4_PLUS, - StateResolutionVersions.V2, - enforce_key_validity=True, - special_case_aliases_auth=False, - strict_canonicaljson=True, - limit_notifications_power_levels=True, - msc2175_implicit_room_creator=False, - msc2176_redaction_rules=False, - msc3083_join_rules=True, - msc3375_redaction_rules=True, - msc2403_knocking=True, - msc3389_relation_redactions=False, - msc3787_knock_restricted_join_rule=True, - msc3667_int_only_power_levels=True, - msc3821_redaction_rules=False, - msc3931_push_features=(), - msc3989_redaction_rules=True, - ) - MSC3820opt2 = RoomVersion( - # Based upon v10 - "org.matrix.msc3820.opt2", - RoomDisposition.UNSTABLE, + V11 = RoomVersion( + "11", + RoomDisposition.STABLE, EventFormatVersions.ROOM_V4_PLUS, StateResolutionVersions.V2, enforce_key_validity=True, special_case_aliases_auth=False, strict_canonicaljson=True, limit_notifications_power_levels=True, - msc2175_implicit_room_creator=True, # Used by MSC3820 - msc2176_redaction_rules=True, # Used by MSC3820 - msc3083_join_rules=True, - msc3375_redaction_rules=True, - msc2403_knocking=True, + implicit_room_creator=True, # Used by MSC3820 + updated_redaction_rules=True, # Used by MSC3820 + restricted_join_rule=True, + restricted_join_rule_fix=True, + knock_join_rule=True, msc3389_relation_redactions=False, - msc3787_knock_restricted_join_rule=True, - msc3667_int_only_power_levels=True, - msc3821_redaction_rules=True, # Used by MSC3820 + knock_restricted_join_rule=True, + enforce_int_power_levels=True, msc3931_push_features=(), - msc3989_redaction_rules=True, # Used by MSC3820 ) @@ -460,14 +344,11 @@ KNOWN_ROOM_VERSIONS: Dict[str, RoomVersion] = { RoomVersions.V4, RoomVersions.V5, RoomVersions.V6, - RoomVersions.MSC2176, RoomVersions.V7, RoomVersions.V8, RoomVersions.V9, - RoomVersions.MSC3787, RoomVersions.V10, - RoomVersions.MSC3989, - RoomVersions.MSC3820opt2, + RoomVersions.V11, ) } @@ -496,12 +377,12 @@ MSC3244_CAPABILITIES = { RoomVersionCapability( "knock", RoomVersions.V7, - lambda room_version: room_version.msc2403_knocking, + lambda room_version: room_version.knock_join_rule, ), RoomVersionCapability( "restricted", RoomVersions.V9, - lambda room_version: room_version.msc3083_join_rules, + lambda room_version: room_version.restricted_join_rule, ), ) } diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 936b1b0430..a94b57a671 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -386,6 +386,7 @@ def listen_unix( def listen_http( + hs: "HomeServer", listener_config: ListenerConfig, root_resource: Resource, version_string: str, @@ -406,6 +407,7 @@ def listen_http( version_string, max_request_body_size=max_request_body_size, reactor=reactor, + hs=hs, ) if isinstance(listener_config, TCPListenerConfig): diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 7406c3948c..dc79efcc14 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -221,6 +221,7 @@ class GenericWorkerServer(HomeServer): root_resource = create_resource_tree(resources, OptionsResource()) _base.listen_http( + self, listener_config, root_resource, self.version_string, diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 84236ac299..f188c7265a 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -139,6 +139,7 @@ class SynapseHomeServer(HomeServer): root_resource = OptionsResource() ports = listen_http( + self, listener_config, create_resource_tree(resources, root_resource), self.version_string, diff --git a/synapse/config/auth.py b/synapse/config/auth.py index c7ab428f28..3b4c77f572 100644 --- a/synapse/config/auth.py +++ b/synapse/config/auth.py @@ -31,7 +31,7 @@ class AuthConfig(Config): # The default value of password_config.enabled is True, unless msc3861 is enabled. msc3861_enabled = ( - config.get("experimental_features", {}) + (config.get("experimental_features") or {}) .get("msc3861", {}) .get("enabled", False) ) diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 8e0f5356b4..0970f22a75 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -382,9 +382,6 @@ class ExperimentalConfig(Config): # Check that none of the other config options conflict with MSC3861 when enabled self.msc3861.check_config_conflicts(self.root) - # MSC4009: E.164 Matrix IDs - self.msc4009_e164_mxids = experimental.get("msc4009_e164_mxids", False) - # MSC4010: Do not allow setting m.push_rules account data. self.msc4010_push_rules_account_data = experimental.get( "msc4010_push_rules_account_data", False diff --git a/synapse/config/workers.py b/synapse/config/workers.py index ccfe75eaf3..6567fb6bb0 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -15,7 +15,7 @@ import argparse import logging -from typing import Any, Dict, List, Union +from typing import Any, Dict, List, Optional, Union import attr from pydantic import BaseModel, Extra, StrictBool, StrictInt, StrictStr @@ -94,7 +94,7 @@ class ConfigModel(BaseModel): allow_mutation = False -class InstanceLocationConfig(ConfigModel): +class InstanceTcpLocationConfig(ConfigModel): """The host and port to talk to an instance via HTTP replication.""" host: StrictStr @@ -110,6 +110,23 @@ class InstanceLocationConfig(ConfigModel): return f"{self.host}:{self.port}" +class InstanceUnixLocationConfig(ConfigModel): + """The socket file to talk to an instance via HTTP replication.""" + + path: StrictStr + + def scheme(self) -> str: + """Hardcode a retrievable scheme""" + return "unix" + + def netloc(self) -> str: + """Nicely format the address location data""" + return f"{self.path}" + + +InstanceLocationConfig = Union[InstanceTcpLocationConfig, InstanceUnixLocationConfig] + + @attr.s class WriterLocations: """Specifies the instances that write various streams. @@ -154,6 +171,27 @@ class WriterLocations: ) +@attr.s(auto_attribs=True) +class OutboundFederationRestrictedTo: + """Whether we limit outbound federation to a certain set of instances. + + Attributes: + instances: optional list of instances that can make outbound federation + requests. If None then all instances can make federation requests. + locations: list of instance locations to connect to proxy via. + """ + + instances: Optional[List[str]] + locations: List[InstanceLocationConfig] = attr.Factory(list) + + def __contains__(self, instance: str) -> bool: + # It feels a bit dirty to return `True` if `instances` is `None`, but it makes + # sense in downstream usage in the sense that if + # `outbound_federation_restricted_to` is not configured, then any instance can + # talk to federation (no restrictions so always return `True`). + return self.instances is None or instance in self.instances + + class WorkerConfig(Config): """The workers are processes run separately to the main synapse process. They have their own pid_file and listener configuration. They use the @@ -270,9 +308,12 @@ class WorkerConfig(Config): % MAIN_PROCESS_INSTANCE_MAP_NAME ) + # type-ignore: the expression `Union[A, B]` is not a Type[Union[A, B]] currently self.instance_map: Dict[ str, InstanceLocationConfig - ] = parse_and_validate_mapping(instance_map, InstanceLocationConfig) + ] = parse_and_validate_mapping( + instance_map, InstanceLocationConfig # type: ignore[arg-type] + ) # Map from type of streams to source, c.f. WriterLocations. writers = config.get("stream_writers") or {} @@ -365,6 +406,28 @@ class WorkerConfig(Config): new_option_name="update_user_directory_from_worker", ) + outbound_federation_restricted_to = config.get( + "outbound_federation_restricted_to", None + ) + self.outbound_federation_restricted_to = OutboundFederationRestrictedTo( + outbound_federation_restricted_to + ) + if outbound_federation_restricted_to: + if not self.worker_replication_secret: + raise ConfigError( + "`worker_replication_secret` must be configured when using `outbound_federation_restricted_to`." + ) + + for instance in outbound_federation_restricted_to: + if instance not in self.instance_map: + raise ConfigError( + "Instance %r is configured in 'outbound_federation_restricted_to' but does not appear in `instance_map` config." + % (instance,) + ) + self.outbound_federation_restricted_to.locations.append( + self.instance_map[instance] + ) + def _should_this_worker_perform_duty( self, config: Dict[str, Any], diff --git a/synapse/event_auth.py b/synapse/event_auth.py index 3aaf53dfbd..3a260a492b 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -126,7 +126,7 @@ def validate_event_for_room_version(event: "EventBase") -> None: raise AuthError(403, "Event not signed by sending server") is_invite_via_allow_rule = ( - event.room_version.msc3083_join_rules + event.room_version.restricted_join_rule and event.type == EventTypes.Member and event.membership == Membership.JOIN and EventContentFields.AUTHORISING_USER in event.content @@ -352,11 +352,9 @@ LENIENT_EVENT_BYTE_LIMITS_ROOM_VERSIONS = { RoomVersions.V4, RoomVersions.V5, RoomVersions.V6, - RoomVersions.MSC2176, RoomVersions.V7, RoomVersions.V8, RoomVersions.V9, - RoomVersions.MSC3787, RoomVersions.V10, RoomVersions.MSC1767v10, } @@ -449,7 +447,7 @@ def _check_create(event: "EventBase") -> None: # 1.4 If content has no creator field, reject if the room version requires it. if ( - not event.room_version.msc2175_implicit_room_creator + not event.room_version.implicit_room_creator and EventContentFields.ROOM_CREATOR not in event.content ): raise AuthError(403, "Create event lacks a 'creator' property") @@ -486,7 +484,7 @@ def _is_membership_change_allowed( key = (EventTypes.Create, "") create = auth_events.get(key) if create and event.prev_event_ids()[0] == create.event_id: - if room_version.msc2175_implicit_room_creator: + if room_version.implicit_room_creator: creator = create.sender else: creator = create.content[EventContentFields.ROOM_CREATOR] @@ -509,7 +507,7 @@ def _is_membership_change_allowed( caller_invited = caller and caller.membership == Membership.INVITE caller_knocked = ( caller - and room_version.msc2403_knocking + and room_version.knock_join_rule and caller.membership == Membership.KNOCK ) @@ -609,9 +607,9 @@ def _is_membership_change_allowed( elif join_rule == JoinRules.PUBLIC: pass elif ( - room_version.msc3083_join_rules and join_rule == JoinRules.RESTRICTED + room_version.restricted_join_rule and join_rule == JoinRules.RESTRICTED ) or ( - room_version.msc3787_knock_restricted_join_rule + room_version.knock_restricted_join_rule and join_rule == JoinRules.KNOCK_RESTRICTED ): # This is the same as public, but the event must contain a reference @@ -641,9 +639,9 @@ def _is_membership_change_allowed( elif ( join_rule == JoinRules.INVITE - or (room_version.msc2403_knocking and join_rule == JoinRules.KNOCK) + or (room_version.knock_join_rule and join_rule == JoinRules.KNOCK) or ( - room_version.msc3787_knock_restricted_join_rule + room_version.knock_restricted_join_rule and join_rule == JoinRules.KNOCK_RESTRICTED ) ): @@ -677,9 +675,9 @@ def _is_membership_change_allowed( "You don't have permission to ban", errcode=Codes.INSUFFICIENT_POWER, ) - elif room_version.msc2403_knocking and Membership.KNOCK == membership: + elif room_version.knock_join_rule and Membership.KNOCK == membership: if join_rule != JoinRules.KNOCK and ( - not room_version.msc3787_knock_restricted_join_rule + not room_version.knock_restricted_join_rule or join_rule != JoinRules.KNOCK_RESTRICTED ): raise AuthError(403, "You don't have permission to knock") @@ -836,7 +834,7 @@ def _check_power_levels( # Reject events with stringy power levels if required by room version if ( event.type == EventTypes.PowerLevels - and room_version_obj.msc3667_int_only_power_levels + and room_version_obj.enforce_int_power_levels ): for k, v in event.content.items(): if k in { @@ -972,7 +970,7 @@ def get_user_power_level(user_id: str, auth_events: StateMap["EventBase"]) -> in key = (EventTypes.Create, "") create_event = auth_events.get(key) if create_event is not None: - if create_event.room_version.msc2175_implicit_room_creator: + if create_event.room_version.implicit_room_creator: creator = create_event.sender else: creator = create_event.content[EventContentFields.ROOM_CREATOR] @@ -1110,7 +1108,7 @@ def auth_types_for_event( ) auth_types.add(key) - if room_version.msc3083_join_rules and membership == Membership.JOIN: + if room_version.restricted_join_rule and membership == Membership.JOIN: if EventContentFields.AUTHORISING_USER in event.content: key = ( EventTypes.Member, diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 75b62adb33..35257a3b1b 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -346,7 +346,7 @@ class EventBase(metaclass=abc.ABCMeta): @property def redacts(self) -> Optional[str]: """MSC2176 moved the redacts field into the content.""" - if self.room_version.msc2176_redaction_rules: + if self.room_version.updated_redaction_rules: return self.content.get("redacts") return self.get("redacts") diff --git a/synapse/events/builder.py b/synapse/events/builder.py index a254548c6c..14ea0e6640 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -175,7 +175,7 @@ class EventBuilder: # MSC2174 moves the redacts property to the content, it is invalid to # provide it as a top-level property. - if self._redacts is not None and not self.room_version.msc2176_redaction_rules: + if self._redacts is not None and not self.room_version.updated_redaction_rules: event_dict["redacts"] = self._redacts if self._origin_server_ts is not None: diff --git a/synapse/events/utils.py b/synapse/events/utils.py index a55efcca56..ecfc5c0568 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -108,13 +108,9 @@ def prune_event_dict(room_version: RoomVersion, event_dict: JsonDict) -> JsonDic "origin_server_ts", ] - # Room versions from before MSC2176 had additional allowed keys. - if not room_version.msc2176_redaction_rules: - allowed_keys.extend(["prev_state", "membership"]) - - # Room versions before MSC3989 kept the origin field. - if not room_version.msc3989_redaction_rules: - allowed_keys.append("origin") + # Earlier room versions from had additional allowed keys. + if not room_version.updated_redaction_rules: + allowed_keys.extend(["prev_state", "membership", "origin"]) event_type = event_dict["type"] @@ -127,9 +123,9 @@ def prune_event_dict(room_version: RoomVersion, event_dict: JsonDict) -> JsonDic if event_type == EventTypes.Member: add_fields("membership") - if room_version.msc3375_redaction_rules: + if room_version.restricted_join_rule_fix: add_fields(EventContentFields.AUTHORISING_USER) - if room_version.msc3821_redaction_rules: + if room_version.updated_redaction_rules: # Preserve the signed field under third_party_invite. third_party_invite = event_dict["content"].get("third_party_invite") if isinstance(third_party_invite, collections.abc.Mapping): @@ -141,13 +137,13 @@ def prune_event_dict(room_version: RoomVersion, event_dict: JsonDict) -> JsonDic elif event_type == EventTypes.Create: # MSC2176 rules state that create events cannot be redacted. - if room_version.msc2176_redaction_rules: + if room_version.updated_redaction_rules: return event_dict add_fields("creator") elif event_type == EventTypes.JoinRules: add_fields("join_rule") - if room_version.msc3083_join_rules: + if room_version.restricted_join_rule: add_fields("allow") elif event_type == EventTypes.PowerLevels: add_fields( @@ -161,14 +157,14 @@ def prune_event_dict(room_version: RoomVersion, event_dict: JsonDict) -> JsonDic "redact", ) - if room_version.msc2176_redaction_rules: + if room_version.updated_redaction_rules: add_fields("invite") elif event_type == EventTypes.Aliases and room_version.special_case_aliases_auth: add_fields("aliases") elif event_type == EventTypes.RoomHistoryVisibility: add_fields("history_visibility") - elif event_type == EventTypes.Redaction and room_version.msc2176_redaction_rules: + elif event_type == EventTypes.Redaction and room_version.updated_redaction_rules: add_fields("redacts") # Protect the rel_type and event_id fields under the m.relates_to field. @@ -477,6 +473,15 @@ def serialize_event( if config.as_client_event: d = config.event_format(d) + # If the event is a redaction, copy the redacts field from the content to + # top-level for backwards compatibility. + if ( + e.type == EventTypes.Redaction + and e.room_version.updated_redaction_rules + and e.redacts is not None + ): + d["redacts"] = e.redacts + only_event_fields = config.only_event_fields if only_event_fields: if not isinstance(only_event_fields, list) or not all( diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index b77022b406..31e0260b83 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -231,7 +231,7 @@ async def _check_sigs_on_pdu( # If this is a join event for a restricted room it may have been authorised # via a different server from the sending server. Check those signatures. if ( - room_version.msc3083_join_rules + room_version.restricted_join_rule and pdu.type == EventTypes.Member and pdu.membership == Membership.JOIN and EventContentFields.AUTHORISING_USER in pdu.content diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index e5359ca558..89bd597409 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -983,7 +983,7 @@ class FederationClient(FederationBase): if not room_version: raise UnsupportedRoomVersionError() - if not room_version.msc2403_knocking and membership == Membership.KNOCK: + if not room_version.knock_join_rule and membership == Membership.KNOCK: raise SynapseError( 400, "This room version does not support knocking", @@ -1069,7 +1069,7 @@ class FederationClient(FederationBase): # * Ensure the signatures are good. # # Otherwise, fallback to the provided event. - if room_version.msc3083_join_rules and response.event: + if room_version.restricted_join_rule and response.event: event = response.event valid_pdu = await self._check_sigs_and_hash_and_fetch_one( @@ -1195,7 +1195,7 @@ class FederationClient(FederationBase): # MSC3083 defines additional error codes for room joins. failover_errcodes = None - if room_version.msc3083_join_rules: + if room_version.restricted_join_rule: failover_errcodes = ( Codes.UNABLE_AUTHORISE_JOIN, Codes.UNABLE_TO_GRANT_JOIN, diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 61fa3b30af..fa61dd8c10 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -806,7 +806,7 @@ class FederationServer(FederationBase): raise IncompatibleRoomVersionError(room_version=room_version.identifier) # Check that this room supports knocking as defined by its room version - if not room_version.msc2403_knocking: + if not room_version.knock_join_rule: raise SynapseError( 403, "This room version does not support knocking", @@ -909,7 +909,7 @@ class FederationServer(FederationBase): errcode=Codes.NOT_FOUND, ) - if membership_type == Membership.KNOCK and not room_version.msc2403_knocking: + if membership_type == Membership.KNOCK and not room_version.knock_join_rule: raise SynapseError( 403, "This room version does not support knocking", @@ -933,7 +933,7 @@ class FederationServer(FederationBase): # the event is valid to be sent into the room. Currently this is only done # if the user is being joined via restricted join rules. if ( - room_version.msc3083_join_rules + room_version.restricted_join_rule and event.membership == Membership.JOIN and EventContentFields.AUTHORISING_USER in event.content ): diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py index 3a744e25be..3248953b48 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py @@ -432,15 +432,6 @@ class FederationV2SendJoinServlet(BaseFederationServerServlet): PREFIX = FEDERATION_V2_PREFIX - def __init__( - self, - hs: "HomeServer", - authenticator: Authenticator, - ratelimiter: FederationRateLimiter, - server_name: str, - ): - super().__init__(hs, authenticator, ratelimiter, server_name) - async def on_PUT( self, origin: str, diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 1e0623c7f8..623a4e7b1d 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -277,7 +277,9 @@ class DirectoryHandler: except RequestSendFailed: raise SynapseError(502, "Failed to fetch alias") except CodeMessageException as e: - logging.warning("Error retrieving alias") + logging.warning( + "Error retrieving alias %s -> %s %s", room_alias, e.code, e.msg + ) if e.code == 404: fed_result = None else: diff --git a/synapse/handlers/event_auth.py b/synapse/handlers/event_auth.py index 3e37c0cbe2..82a7617a08 100644 --- a/synapse/handlers/event_auth.py +++ b/synapse/handlers/event_auth.py @@ -277,7 +277,7 @@ class EventAuthHandler: True if the proper room version and join rules are set for restricted access. """ # This only applies to room versions which support the new join rule. - if not room_version.msc3083_join_rules: + if not room_version.restricted_join_rule: return False # If there's no join rule, then it defaults to invite (so this doesn't apply). @@ -292,7 +292,7 @@ class EventAuthHandler: return True # also check for MSC3787 behaviour - if room_version.msc3787_knock_restricted_join_rule: + if room_version.knock_restricted_join_rule: return content_join_rule == JoinRules.KNOCK_RESTRICTED return False diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index cc5ed97730..15b9fbe44a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -957,7 +957,7 @@ class FederationHandler: # Note that this requires the /send_join request to come back to the # same server. prev_event_ids = None - if room_version.msc3083_join_rules: + if room_version.restricted_join_rule: # Note that the room's state can change out from under us and render our # nice join rules-conformant event non-conformant by the time we build the # event. When this happens, our validation at the end fails and we respond diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 0a219b7962..cd7df0525f 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -95,13 +95,12 @@ bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"]) notify_reason_counter = Counter( - "synapse_handler_presence_notify_reason", "", ["reason"] + "synapse_handler_presence_notify_reason", "", ["locality", "reason"] ) state_transition_counter = Counter( - "synapse_handler_presence_state_transition", "", ["from", "to"] + "synapse_handler_presence_state_transition", "", ["locality", "from", "to"] ) - # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them # "currently_active" LAST_ACTIVE_GRANULARITY = 60 * 1000 @@ -567,8 +566,8 @@ class WorkerPresenceHandler(BasePresenceHandler): for new_state in states: old_state = self.user_to_current_state.get(new_state.user_id) self.user_to_current_state[new_state.user_id] = new_state - - if not old_state or should_notify(old_state, new_state): + is_mine = self.is_mine_id(new_state.user_id) + if not old_state or should_notify(old_state, new_state, is_mine): state_to_notify.append(new_state) stream_id = token @@ -1499,23 +1498,31 @@ class PresenceHandler(BasePresenceHandler): ) -def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> bool: +def should_notify( + old_state: UserPresenceState, new_state: UserPresenceState, is_mine: bool +) -> bool: """Decides if a presence state change should be sent to interested parties.""" + user_location = "remote" + if is_mine: + user_location = "local" + if old_state == new_state: return False if old_state.status_msg != new_state.status_msg: - notify_reason_counter.labels("status_msg_change").inc() + notify_reason_counter.labels(user_location, "status_msg_change").inc() return True if old_state.state != new_state.state: - notify_reason_counter.labels("state_change").inc() - state_transition_counter.labels(old_state.state, new_state.state).inc() + notify_reason_counter.labels(user_location, "state_change").inc() + state_transition_counter.labels( + user_location, old_state.state, new_state.state + ).inc() return True if old_state.state == PresenceState.ONLINE: if new_state.currently_active != old_state.currently_active: - notify_reason_counter.labels("current_active_change").inc() + notify_reason_counter.labels(user_location, "current_active_change").inc() return True if ( @@ -1524,12 +1531,16 @@ def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> ): # Only notify about last active bumps if we're not currently active if not new_state.currently_active: - notify_reason_counter.labels("last_active_change_online").inc() + notify_reason_counter.labels( + user_location, "last_active_change_online" + ).inc() return True elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: # Always notify for a transition where last active gets bumped. - notify_reason_counter.labels("last_active_change_not_online").inc() + notify_reason_counter.labels( + user_location, "last_active_change_not_online" + ).inc() return True return False @@ -1989,7 +2000,7 @@ def handle_update( ) # Check whether the change was something worth notifying about - if should_notify(prev_state, new_state): + if should_notify(prev_state, new_state, is_mine): new_state = new_state.copy_and_replace(last_federation_update_ts=now) persist_and_notify = True diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index a2d3f03061..3a55056df5 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -143,15 +143,10 @@ class RegistrationHandler: assigned_user_id: Optional[str] = None, inhibit_user_in_use_error: bool = False, ) -> None: - if types.contains_invalid_mxid_characters( - localpart, self.hs.config.experimental.msc4009_e164_mxids - ): - extra_chars = ( - "=_-./+" if self.hs.config.experimental.msc4009_e164_mxids else "=_-./" - ) + if types.contains_invalid_mxid_characters(localpart): raise SynapseError( 400, - f"User ID can only contain characters a-z, 0-9, or '{extra_chars}'", + "User ID can only contain characters a-z, 0-9, or '=_-./+'", Codes.INVALID_USERNAME, ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index bf907b7881..0513e28aab 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1116,7 +1116,7 @@ class RoomCreationHandler: preset_config, config = self._room_preset_config(room_config) # MSC2175 removes the creator field from the create event. - if not room_version.msc2175_implicit_room_creator: + if not room_version.implicit_room_creator: creation_content["creator"] = creator_id creation_event, unpersisted_creation_context = await create_event( EventTypes.Create, creation_content, False diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py index 807245160d..dad3e23470 100644 --- a/synapse/handlers/room_summary.py +++ b/synapse/handlers/room_summary.py @@ -564,9 +564,9 @@ class RoomSummaryHandler: join_rule = join_rules_event.content.get("join_rule") if ( join_rule == JoinRules.PUBLIC - or (room_version.msc2403_knocking and join_rule == JoinRules.KNOCK) + or (room_version.knock_join_rule and join_rule == JoinRules.KNOCK) or ( - room_version.msc3787_knock_restricted_join_rule + room_version.knock_restricted_join_rule and join_rule == JoinRules.KNOCK_RESTRICTED ) ): diff --git a/synapse/handlers/saml.py b/synapse/handlers/saml.py index 874860d461..6083c9f4b5 100644 --- a/synapse/handlers/saml.py +++ b/synapse/handlers/saml.py @@ -27,9 +27,9 @@ from synapse.http.servlet import parse_string from synapse.http.site import SynapseRequest from synapse.module_api import ModuleApi from synapse.types import ( + MXID_LOCALPART_ALLOWED_CHARACTERS, UserID, map_username_to_mxid_localpart, - mxid_localpart_allowed_characters, ) from synapse.util.iterutils import chunk_seq @@ -371,7 +371,7 @@ class SamlHandler: DOT_REPLACE_PATTERN = re.compile( - "[^%s]" % (re.escape("".join(mxid_localpart_allowed_characters)),) + "[^%s]" % (re.escape("".join(MXID_LOCALPART_ALLOWED_CHARACTERS)),) ) diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py index c3a51722bd..4d29328a74 100644 --- a/synapse/handlers/sso.py +++ b/synapse/handlers/sso.py @@ -225,8 +225,6 @@ class SsoHandler: self._consent_at_registration = hs.config.consent.user_consent_at_registration - self._e164_mxids = hs.config.experimental.msc4009_e164_mxids - def register_identity_provider(self, p: SsoIdentityProvider) -> None: p_id = p.idp_id assert p_id not in self._identity_providers @@ -713,7 +711,7 @@ class SsoHandler: # Since the localpart is provided via a potentially untrusted module, # ensure the MXID is valid before registering. if not attributes.localpart or contains_invalid_mxid_characters( - attributes.localpart, self._e164_mxids + attributes.localpart ): raise MappingException("localpart is invalid: %s" % (attributes.localpart,)) @@ -946,7 +944,7 @@ class SsoHandler: localpart, ) - if contains_invalid_mxid_characters(localpart, self._e164_mxids): + if contains_invalid_mxid_characters(localpart): raise SynapseError(400, "localpart is invalid: %s" % (localpart,)) user_id = UserID(localpart, self._server_name).to_string() user_infos = await self._store.get_users_by_id_case_insensitive(user_id) diff --git a/synapse/http/client.py b/synapse/http/client.py index 09ea93e10d..ca2cdbc6e2 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -1037,7 +1037,12 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol): if reason.check(ResponseDone): self.deferred.callback(self.length) elif reason.check(PotentialDataLoss): - # stolen from https://github.com/twisted/treq/pull/49/files + # This applies to requests which don't set `Content-Length` or a + # `Transfer-Encoding` in the response because in this case the end of the + # response is indicated by the connection being closed, an event which may + # also be due to a transient network problem or other error. But since this + # behavior is expected of some servers (like YouTube), let's ignore it. + # Stolen from https://github.com/twisted/treq/pull/49/files # http://twistedmatrix.com/trac/ticket/4840 self.deferred.callback(self.length) else: diff --git a/synapse/http/connectproxyclient.py b/synapse/http/connectproxyclient.py index 23a60af171..636efc33e8 100644 --- a/synapse/http/connectproxyclient.py +++ b/synapse/http/connectproxyclient.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import abc import base64 import logging from typing import Optional, Union @@ -39,8 +40,14 @@ class ProxyConnectError(ConnectError): pass -@attr.s(auto_attribs=True) class ProxyCredentials: + @abc.abstractmethod + def as_proxy_authorization_value(self) -> bytes: + raise NotImplementedError() + + +@attr.s(auto_attribs=True) +class BasicProxyCredentials(ProxyCredentials): username_password: bytes def as_proxy_authorization_value(self) -> bytes: @@ -55,6 +62,17 @@ class ProxyCredentials: return b"Basic " + base64.encodebytes(self.username_password) +@attr.s(auto_attribs=True) +class BearerProxyCredentials(ProxyCredentials): + access_token: bytes + + def as_proxy_authorization_value(self) -> bytes: + """ + Return the value for a Proxy-Authorization header (i.e. 'Bearer xxx'). + """ + return b"Bearer " + self.access_token + + @implementer(IStreamClientEndpoint) class HTTPConnectProxyEndpoint: """An Endpoint implementation which will send a CONNECT request to an http proxy diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index cc4e258b0f..583c03447c 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -50,7 +50,7 @@ from twisted.internet.interfaces import IReactorTime from twisted.internet.task import Cooperator from twisted.web.client import ResponseFailed from twisted.web.http_headers import Headers -from twisted.web.iweb import IBodyProducer, IResponse +from twisted.web.iweb import IAgent, IBodyProducer, IResponse import synapse.metrics import synapse.util.retryutils @@ -71,7 +71,9 @@ from synapse.http.client import ( encode_query_args, read_body_with_max_size, ) +from synapse.http.connectproxyclient import BearerProxyCredentials from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent +from synapse.http.proxyagent import ProxyAgent from synapse.http.types import QueryParams from synapse.logging import opentracing from synapse.logging.context import make_deferred_yieldable, run_in_background @@ -393,17 +395,41 @@ class MatrixFederationHttpClient: if hs.config.server.user_agent_suffix: user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix) - federation_agent = MatrixFederationAgent( - self.reactor, - tls_client_options_factory, - user_agent.encode("ascii"), - hs.config.server.federation_ip_range_allowlist, - hs.config.server.federation_ip_range_blocklist, + outbound_federation_restricted_to = ( + hs.config.worker.outbound_federation_restricted_to ) + if hs.get_instance_name() in outbound_federation_restricted_to: + # Talk to federation directly + federation_agent: IAgent = MatrixFederationAgent( + self.reactor, + tls_client_options_factory, + user_agent.encode("ascii"), + hs.config.server.federation_ip_range_allowlist, + hs.config.server.federation_ip_range_blocklist, + ) + else: + proxy_authorization_secret = hs.config.worker.worker_replication_secret + assert ( + proxy_authorization_secret is not None + ), "`worker_replication_secret` must be set when using `outbound_federation_restricted_to` (used to authenticate requests across workers)" + federation_proxy_credentials = BearerProxyCredentials( + proxy_authorization_secret.encode("ascii") + ) + + # We need to talk to federation via the proxy via one of the configured + # locations + federation_proxy_locations = outbound_federation_restricted_to.locations + federation_agent = ProxyAgent( + self.reactor, + self.reactor, + tls_client_options_factory, + federation_proxy_locations=federation_proxy_locations, + federation_proxy_credentials=federation_proxy_credentials, + ) # Use a BlocklistingAgentWrapper to prevent circumventing the IP # blocking via IP literals in server names - self.agent = BlocklistingAgentWrapper( + self.agent: IAgent = BlocklistingAgentWrapper( federation_agent, ip_blocklist=hs.config.server.federation_ip_range_blocklist, ) @@ -412,7 +438,6 @@ class MatrixFederationHttpClient: self._store = hs.get_datastores().main self.version_string_bytes = hs.version_string.encode("ascii") self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000 - self.max_long_retry_delay_seconds = ( hs.config.federation.max_long_retry_delay_ms / 1000 ) @@ -1141,6 +1166,101 @@ class MatrixFederationHttpClient: RequestSendFailed: If there were problems connecting to the remote, due to e.g. DNS failures, connection timeouts etc. """ + json_dict, _ = await self.get_json_with_headers( + destination=destination, + path=path, + args=args, + retry_on_dns_fail=retry_on_dns_fail, + timeout=timeout, + ignore_backoff=ignore_backoff, + try_trailing_slash_on_400=try_trailing_slash_on_400, + parser=parser, + ) + return json_dict + + @overload + async def get_json_with_headers( + self, + destination: str, + path: str, + args: Optional[QueryParams] = None, + retry_on_dns_fail: bool = True, + timeout: Optional[int] = None, + ignore_backoff: bool = False, + try_trailing_slash_on_400: bool = False, + parser: Literal[None] = None, + ) -> Tuple[JsonDict, Dict[bytes, List[bytes]]]: + ... + + @overload + async def get_json_with_headers( + self, + destination: str, + path: str, + args: Optional[QueryParams] = ..., + retry_on_dns_fail: bool = ..., + timeout: Optional[int] = ..., + ignore_backoff: bool = ..., + try_trailing_slash_on_400: bool = ..., + parser: ByteParser[T] = ..., + ) -> Tuple[T, Dict[bytes, List[bytes]]]: + ... + + async def get_json_with_headers( + self, + destination: str, + path: str, + args: Optional[QueryParams] = None, + retry_on_dns_fail: bool = True, + timeout: Optional[int] = None, + ignore_backoff: bool = False, + try_trailing_slash_on_400: bool = False, + parser: Optional[ByteParser[T]] = None, + ) -> Tuple[Union[JsonDict, T], Dict[bytes, List[bytes]]]: + """GETs some json from the given host homeserver and path + + Args: + destination: The remote server to send the HTTP request to. + + path: The HTTP path. + + args: A dictionary used to create query strings, defaults to + None. + + retry_on_dns_fail: true if the request should be retried on DNS failures + + timeout: number of milliseconds to wait for the response. + self._default_timeout (60s) by default. + + Note that we may make several attempts to send the request; this + timeout applies to the time spent waiting for response headers for + *each* attempt (including connection time) as well as the time spent + reading the response body after a 200 response. + + ignore_backoff: true to ignore the historical backoff data + and try the request anyway. + + try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED + response we should try appending a trailing slash to the end of + the request. Workaround for #3622 in Synapse <= v0.99.3. + + parser: The parser to use to decode the response. Defaults to + parsing as JSON. + + Returns: + Succeeds when we get a 2xx HTTP response. The result will be a tuple of the + decoded JSON body and a dict of the response headers. + + Raises: + HttpResponseException: If we get an HTTP response code >= 300 + (except 429). + NotRetryingDestination: If we are not yet ready to retry this + server. + FederationDeniedError: If this destination is not on our + federation whitelist + RequestSendFailed: If there were problems connecting to the + remote, due to e.g. DNS failures, connection timeouts etc. + """ request = MatrixFederationRequest( method="GET", destination=destination, path=path, query=args ) @@ -1156,6 +1276,8 @@ class MatrixFederationHttpClient: timeout=timeout, ) + headers = dict(response.headers.getAllRawHeaders()) + if timeout is not None: _sec_timeout = timeout / 1000 else: @@ -1173,7 +1295,7 @@ class MatrixFederationHttpClient: parser=parser, ) - return body + return body, headers async def delete_json( self, diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py new file mode 100644 index 0000000000..c9f51e51bc --- /dev/null +++ b/synapse/http/proxy.py @@ -0,0 +1,283 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import json +import logging +import urllib.parse +from typing import TYPE_CHECKING, Any, Optional, Set, Tuple, cast + +from twisted.internet import protocol +from twisted.internet.interfaces import ITCPTransport +from twisted.internet.protocol import connectionDone +from twisted.python import failure +from twisted.python.failure import Failure +from twisted.web.client import ResponseDone +from twisted.web.http_headers import Headers +from twisted.web.iweb import IResponse +from twisted.web.resource import IResource +from twisted.web.server import Request, Site + +from synapse.api.errors import Codes, InvalidProxyCredentialsError +from synapse.http import QuieterFileBodyProducer +from synapse.http.server import _AsyncResource +from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.types import ISynapseReactor +from synapse.util.async_helpers import timeout_deferred + +if TYPE_CHECKING: + from synapse.http.site import SynapseRequest + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + +# "Hop-by-hop" headers (as opposed to "end-to-end" headers) as defined by RFC2616 +# section 13.5.1 and referenced in RFC9110 section 7.6.1. These are meant to only be +# consumed by the immediate recipient and not be forwarded on. +HOP_BY_HOP_HEADERS = { + "Connection", + "Keep-Alive", + "Proxy-Authenticate", + "Proxy-Authorization", + "TE", + "Trailers", + "Transfer-Encoding", + "Upgrade", +} + + +def parse_connection_header_value( + connection_header_value: Optional[bytes], +) -> Set[str]: + """ + Parse the `Connection` header to determine which headers we should not be copied + over from the remote response. + + As defined by RFC2616 section 14.10 and RFC9110 section 7.6.1 + + Example: `Connection: close, X-Foo, X-Bar` will return `{"Close", "X-Foo", "X-Bar"}` + + Even though "close" is a special directive, let's just treat it as just another + header for simplicity. If people want to check for this directive, they can simply + check for `"Close" in headers`. + + Args: + connection_header_value: The value of the `Connection` header. + + Returns: + The set of header names that should not be copied over from the remote response. + The keys are capitalized in canonical capitalization. + """ + headers = Headers() + extra_headers_to_remove: Set[str] = set() + if connection_header_value: + extra_headers_to_remove = { + headers._canonicalNameCaps(connection_option.strip()).decode("ascii") + for connection_option in connection_header_value.split(b",") + } + + return extra_headers_to_remove + + +class ProxyResource(_AsyncResource): + """ + A stub resource that proxies any requests with a `matrix-federation://` scheme + through the given `federation_agent` to the remote homeserver and ferries back the + info. + """ + + isLeaf = True + + def __init__(self, reactor: ISynapseReactor, hs: "HomeServer"): + super().__init__(True) + + self.reactor = reactor + self.agent = hs.get_federation_http_client().agent + + self._proxy_authorization_secret = hs.config.worker.worker_replication_secret + + def _check_auth(self, request: Request) -> None: + # The `matrix-federation://` proxy functionality can only be used with auth. + # Protect homserver admins forgetting to configure a secret. + assert self._proxy_authorization_secret is not None + + # Get the authorization header. + auth_headers = request.requestHeaders.getRawHeaders(b"Proxy-Authorization") + + if not auth_headers: + raise InvalidProxyCredentialsError( + "Missing Proxy-Authorization header.", Codes.MISSING_TOKEN + ) + if len(auth_headers) > 1: + raise InvalidProxyCredentialsError( + "Too many Proxy-Authorization headers.", Codes.UNAUTHORIZED + ) + parts = auth_headers[0].split(b" ") + if parts[0] == b"Bearer" and len(parts) == 2: + received_secret = parts[1].decode("ascii") + if self._proxy_authorization_secret == received_secret: + # Success! + return + + raise InvalidProxyCredentialsError( + "Invalid Proxy-Authorization header.", Codes.UNAUTHORIZED + ) + + async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]: + uri = urllib.parse.urlparse(request.uri) + assert uri.scheme == b"matrix-federation" + + # Check the authorization headers before handling the request. + self._check_auth(request) + + headers = Headers() + for header_name in (b"User-Agent", b"Authorization", b"Content-Type"): + header_value = request.getHeader(header_name) + if header_value: + headers.addRawHeader(header_name, header_value) + + request_deferred = run_in_background( + self.agent.request, + request.method, + request.uri, + headers=headers, + bodyProducer=QuieterFileBodyProducer(request.content), + ) + request_deferred = timeout_deferred( + request_deferred, + # This should be set longer than the timeout in `MatrixFederationHttpClient` + # so that it has enough time to complete and pass us the data before we give + # up. + timeout=90, + reactor=self.reactor, + ) + + response = await make_deferred_yieldable(request_deferred) + + return response.code, response + + def _send_response( + self, + request: "SynapseRequest", + code: int, + response_object: Any, + ) -> None: + response = cast(IResponse, response_object) + response_headers = cast(Headers, response.headers) + + request.setResponseCode(code) + + # The `Connection` header also defines which headers should not be copied over. + connection_header = response_headers.getRawHeaders(b"connection") + extra_headers_to_remove = parse_connection_header_value( + connection_header[0] if connection_header else None + ) + + # Copy headers. + for k, v in response_headers.getAllRawHeaders(): + # Do not copy over any hop-by-hop headers. These are meant to only be + # consumed by the immediate recipient and not be forwarded on. + header_key = k.decode("ascii") + if ( + header_key in HOP_BY_HOP_HEADERS + or header_key in extra_headers_to_remove + ): + continue + + request.responseHeaders.setRawHeaders(k, v) + + response.deliverBody(_ProxyResponseBody(request)) + + def _send_error_response( + self, + f: failure.Failure, + request: "SynapseRequest", + ) -> None: + if isinstance(f.value, InvalidProxyCredentialsError): + error_response_code = f.value.code + error_response_json = {"errcode": f.value.errcode, "err": f.value.msg} + else: + error_response_code = 502 + error_response_json = { + "errcode": Codes.UNKNOWN, + "err": "ProxyResource: Error when proxying request: %s %s -> %s" + % ( + request.method.decode("ascii"), + request.uri.decode("ascii"), + f, + ), + } + + request.setResponseCode(error_response_code) + request.setHeader(b"Content-Type", b"application/json") + request.write((json.dumps(error_response_json)).encode()) + request.finish() + + +class _ProxyResponseBody(protocol.Protocol): + """ + A protocol that proxies the given remote response data back out to the given local + request. + """ + + transport: Optional[ITCPTransport] = None + + def __init__(self, request: "SynapseRequest") -> None: + self._request = request + + def dataReceived(self, data: bytes) -> None: + # Avoid sending response data to the local request that already disconnected + if self._request._disconnected and self.transport is not None: + # Close the connection (forcefully) since all the data will get + # discarded anyway. + self.transport.abortConnection() + return + + self._request.write(data) + + def connectionLost(self, reason: Failure = connectionDone) -> None: + # If the local request is already finished (successfully or failed), don't + # worry about sending anything back. + if self._request.finished: + return + + if reason.check(ResponseDone): + self._request.finish() + else: + # Abort the underlying request since our remote request also failed. + self._request.transport.abortConnection() + + +class ProxySite(Site): + """ + Proxies any requests with a `matrix-federation://` scheme through the given + `federation_agent`. Otherwise, behaves like a normal `Site`. + """ + + def __init__( + self, + resource: IResource, + reactor: ISynapseReactor, + hs: "HomeServer", + ): + super().__init__(resource, reactor=reactor) + + self._proxy_resource = ProxyResource(reactor, hs=hs) + + def getResourceFor(self, request: "SynapseRequest") -> IResource: + uri = urllib.parse.urlparse(request.uri) + if uri.scheme == b"matrix-federation": + return self._proxy_resource + + return super().getResourceFor(request) diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py index 7bdc4acae7..59ab8fad35 100644 --- a/synapse/http/proxyagent.py +++ b/synapse/http/proxyagent.py @@ -12,8 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import random import re -from typing import Any, Dict, Optional, Tuple +from typing import Any, Collection, Dict, List, Optional, Sequence, Tuple from urllib.parse import urlparse from urllib.request import ( # type: ignore[attr-defined] getproxies_environment, @@ -23,8 +24,17 @@ from urllib.request import ( # type: ignore[attr-defined] from zope.interface import implementer from twisted.internet import defer -from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS -from twisted.internet.interfaces import IReactorCore, IStreamClientEndpoint +from twisted.internet.endpoints import ( + HostnameEndpoint, + UNIXClientEndpoint, + wrapClientTLS, +) +from twisted.internet.interfaces import ( + IProtocol, + IProtocolFactory, + IReactorCore, + IStreamClientEndpoint, +) from twisted.python.failure import Failure from twisted.web.client import ( URI, @@ -36,8 +46,18 @@ from twisted.web.error import SchemeNotSupported from twisted.web.http_headers import Headers from twisted.web.iweb import IAgent, IBodyProducer, IPolicyForHTTPS, IResponse +from synapse.config.workers import ( + InstanceLocationConfig, + InstanceTcpLocationConfig, + InstanceUnixLocationConfig, +) from synapse.http import redact_uri -from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint, ProxyCredentials +from synapse.http.connectproxyclient import ( + BasicProxyCredentials, + HTTPConnectProxyEndpoint, + ProxyCredentials, +) +from synapse.logging.context import run_in_background logger = logging.getLogger(__name__) @@ -74,6 +94,14 @@ class ProxyAgent(_AgentBase): use_proxy: Whether proxy settings should be discovered and used from conventional environment variables. + federation_proxy_locations: An optional list of locations to proxy outbound federation + traffic through (only requests that use the `matrix-federation://` scheme + will be proxied). + + federation_proxy_credentials: Required if `federation_proxy_locations` is set. The + credentials to use when proxying outbound federation traffic through another + worker. + Raises: ValueError if use_proxy is set and the environment variables contain an invalid proxy specification. @@ -89,6 +117,8 @@ class ProxyAgent(_AgentBase): bindAddress: Optional[bytes] = None, pool: Optional[HTTPConnectionPool] = None, use_proxy: bool = False, + federation_proxy_locations: Collection[InstanceLocationConfig] = (), + federation_proxy_credentials: Optional[ProxyCredentials] = None, ): contextFactory = contextFactory or BrowserLikePolicyForHTTPS() @@ -127,6 +157,47 @@ class ProxyAgent(_AgentBase): self._policy_for_https = contextFactory self._reactor = reactor + self._federation_proxy_endpoint: Optional[IStreamClientEndpoint] = None + self._federation_proxy_credentials: Optional[ProxyCredentials] = None + if federation_proxy_locations: + assert ( + federation_proxy_credentials is not None + ), "`federation_proxy_credentials` are required when using `federation_proxy_locations`" + + endpoints: List[IStreamClientEndpoint] = [] + for federation_proxy_location in federation_proxy_locations: + endpoint: IStreamClientEndpoint + if isinstance(federation_proxy_location, InstanceTcpLocationConfig): + endpoint = HostnameEndpoint( + self.proxy_reactor, + federation_proxy_location.host, + federation_proxy_location.port, + ) + if federation_proxy_location.tls: + tls_connection_creator = ( + self._policy_for_https.creatorForNetloc( + federation_proxy_location.host.encode("utf-8"), + federation_proxy_location.port, + ) + ) + endpoint = wrapClientTLS(tls_connection_creator, endpoint) + + elif isinstance(federation_proxy_location, InstanceUnixLocationConfig): + endpoint = UNIXClientEndpoint( + self.proxy_reactor, federation_proxy_location.path + ) + + else: + # It is supremely unlikely we ever hit this + raise SchemeNotSupported( + f"Unknown type of Endpoint requested, check {federation_proxy_location}" + ) + + endpoints.append(endpoint) + + self._federation_proxy_endpoint = _RandomSampleEndpoints(endpoints) + self._federation_proxy_credentials = federation_proxy_credentials + def request( self, method: bytes, @@ -214,6 +285,25 @@ class ProxyAgent(_AgentBase): parsed_uri.port, self.https_proxy_creds, ) + elif ( + parsed_uri.scheme == b"matrix-federation" + and self._federation_proxy_endpoint + ): + assert ( + self._federation_proxy_credentials is not None + ), "`federation_proxy_credentials` are required when using `federation_proxy_locations`" + + # Set a Proxy-Authorization header + if headers is None: + headers = Headers() + # We always need authentication for the outbound federation proxy + headers.addRawHeader( + b"Proxy-Authorization", + self._federation_proxy_credentials.as_proxy_authorization_value(), + ) + + endpoint = self._federation_proxy_endpoint + request_path = uri else: # not using a proxy endpoint = HostnameEndpoint( @@ -233,6 +323,11 @@ class ProxyAgent(_AgentBase): endpoint = wrapClientTLS(tls_connection_creator, endpoint) elif parsed_uri.scheme == b"http": pass + elif ( + parsed_uri.scheme == b"matrix-federation" + and self._federation_proxy_endpoint + ): + pass else: return defer.fail( Failure( @@ -334,6 +429,42 @@ def parse_proxy( credentials = None if url.username and url.password: - credentials = ProxyCredentials(b"".join([url.username, b":", url.password])) + credentials = BasicProxyCredentials( + b"".join([url.username, b":", url.password]) + ) return url.scheme, url.hostname, url.port or default_port, credentials + + +@implementer(IStreamClientEndpoint) +class _RandomSampleEndpoints: + """An endpoint that randomly iterates through a given list of endpoints at + each connection attempt. + """ + + def __init__( + self, + endpoints: Sequence[IStreamClientEndpoint], + ) -> None: + assert endpoints + self._endpoints = endpoints + + def __repr__(self) -> str: + return f"<_RandomSampleEndpoints endpoints={self._endpoints}>" + + def connect( + self, protocol_factory: IProtocolFactory + ) -> "defer.Deferred[IProtocol]": + """Implements IStreamClientEndpoint interface""" + + return run_in_background(self._do_connect, protocol_factory) + + async def _do_connect(self, protocol_factory: IProtocolFactory) -> IProtocol: + failures: List[Failure] = [] + for endpoint in random.sample(self._endpoints, k=len(self._endpoints)): + try: + return await endpoint.connect(protocol_factory) + except Exception: + failures.append(Failure()) + + failures.pop().raiseException() diff --git a/synapse/http/replicationagent.py b/synapse/http/replicationagent.py index d6ba6f0e57..3ba2f22dfd 100644 --- a/synapse/http/replicationagent.py +++ b/synapse/http/replicationagent.py @@ -18,7 +18,11 @@ from typing import Dict, Optional from zope.interface import implementer from twisted.internet import defer -from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS +from twisted.internet.endpoints import ( + HostnameEndpoint, + UNIXClientEndpoint, + wrapClientTLS, +) from twisted.internet.interfaces import IStreamClientEndpoint from twisted.python.failure import Failure from twisted.web.client import URI, HTTPConnectionPool, _AgentBase @@ -32,7 +36,11 @@ from twisted.web.iweb import ( IResponse, ) -from synapse.config.workers import InstanceLocationConfig +from synapse.config.workers import ( + InstanceLocationConfig, + InstanceTcpLocationConfig, + InstanceUnixLocationConfig, +) from synapse.types import ISynapseReactor logger = logging.getLogger(__name__) @@ -40,7 +48,7 @@ logger = logging.getLogger(__name__) @implementer(IAgentEndpointFactory) class ReplicationEndpointFactory: - """Connect to a given TCP socket""" + """Connect to a given TCP or UNIX socket""" def __init__( self, @@ -64,24 +72,27 @@ class ReplicationEndpointFactory: # The given URI has a special scheme and includes the worker name. The # actual connection details are pulled from the instance map. worker_name = uri.netloc.decode("utf-8") - scheme = self.instance_map[worker_name].scheme() + location_config = self.instance_map[worker_name] + scheme = location_config.scheme() - if scheme in ("http", "https"): + if isinstance(location_config, InstanceTcpLocationConfig): endpoint = HostnameEndpoint( self.reactor, - self.instance_map[worker_name].host, - self.instance_map[worker_name].port, + location_config.host, + location_config.port, ) if scheme == "https": endpoint = wrapClientTLS( # The 'port' argument below isn't actually used by the function self.context_factory.creatorForNetloc( - self.instance_map[worker_name].host.encode("utf-8"), - self.instance_map[worker_name].port, + location_config.host.encode("utf-8"), + location_config.port, ), endpoint, ) return endpoint + elif isinstance(location_config, InstanceUnixLocationConfig): + return UNIXClientEndpoint(self.reactor, location_config.path) else: raise SchemeNotSupported(f"Unsupported scheme: {scheme}") @@ -138,13 +149,16 @@ class ReplicationAgent(_AgentBase): An existing connection from the connection pool may be used or a new one may be created. - Currently, HTTP and HTTPS schemes are supported in uri. + Currently, HTTP, HTTPS and UNIX schemes are supported in uri. This is copied from twisted.web.client.Agent, except: - * It uses a different pool key (combining the host & port). - * It does not call _ensureValidURI(...) since it breaks on some - UNIX paths. + * It uses a different pool key (combining the scheme with either host & port or + socket path). + * It does not call _ensureValidURI(...) as the strictness of IDNA2008 is not + required when using a worker's name as a 'hostname' for Synapse HTTP + Replication machinery. Specifically, this allows a range of ascii characters + such as '+' and '_' in hostnames/worker's names. See: twisted.web.iweb.IAgent.request """ @@ -154,9 +168,12 @@ class ReplicationAgent(_AgentBase): except SchemeNotSupported: return defer.fail(Failure()) + worker_name = parsedURI.netloc.decode("utf-8") + key_scheme = self._endpointFactory.instance_map[worker_name].scheme() + key_netloc = self._endpointFactory.instance_map[worker_name].netloc() # This sets the Pool key to be: - # (http(s), <host:ip>) - key = (parsedURI.scheme, parsedURI.netloc) + # (http(s), <host:port>) or (unix, <socket_path>) + key = (key_scheme, key_netloc) # _requestWithEndpoint comes from _AgentBase class return self._requestWithEndpoint( diff --git a/synapse/http/server.py b/synapse/http/server.py index e411ac7e62..f592600880 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -18,6 +18,7 @@ import html import logging import types import urllib +import urllib.parse from http import HTTPStatus from http.client import FOUND from inspect import isawaitable @@ -65,7 +66,6 @@ from synapse.api.errors import ( UnrecognizedRequestError, ) from synapse.config.homeserver import HomeServerConfig -from synapse.http.site import SynapseRequest from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background from synapse.logging.opentracing import active_span, start_active_span, trace_servlet from synapse.util import json_encoder @@ -76,6 +76,7 @@ from synapse.util.iterutils import chunk_seq if TYPE_CHECKING: import opentracing + from synapse.http.site import SynapseRequest from synapse.server import HomeServer logger = logging.getLogger(__name__) @@ -102,7 +103,7 @@ HTTP_STATUS_REQUEST_CANCELLED = 499 def return_json_error( - f: failure.Failure, request: SynapseRequest, config: Optional[HomeServerConfig] + f: failure.Failure, request: "SynapseRequest", config: Optional[HomeServerConfig] ) -> None: """Sends a JSON error response to clients.""" @@ -220,8 +221,8 @@ def return_html_error( def wrap_async_request_handler( - h: Callable[["_AsyncResource", SynapseRequest], Awaitable[None]] -) -> Callable[["_AsyncResource", SynapseRequest], "defer.Deferred[None]"]: + h: Callable[["_AsyncResource", "SynapseRequest"], Awaitable[None]] +) -> Callable[["_AsyncResource", "SynapseRequest"], "defer.Deferred[None]"]: """Wraps an async request handler so that it calls request.processing. This helps ensure that work done by the request handler after the request is completed @@ -235,7 +236,7 @@ def wrap_async_request_handler( """ async def wrapped_async_request_handler( - self: "_AsyncResource", request: SynapseRequest + self: "_AsyncResource", request: "SynapseRequest" ) -> None: with request.processing(): await h(self, request) @@ -300,7 +301,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta): self._extract_context = extract_context - def render(self, request: SynapseRequest) -> int: + def render(self, request: "SynapseRequest") -> int: """This gets called by twisted every time someone sends us a request.""" request.render_deferred = defer.ensureDeferred( self._async_render_wrapper(request) @@ -308,7 +309,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta): return NOT_DONE_YET @wrap_async_request_handler - async def _async_render_wrapper(self, request: SynapseRequest) -> None: + async def _async_render_wrapper(self, request: "SynapseRequest") -> None: """This is a wrapper that delegates to `_async_render` and handles exceptions, return values, metrics, etc. """ @@ -326,9 +327,15 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta): # of our stack, and thus gives us a sensible stack # trace. f = failure.Failure() + logger.exception( + "Error handling request", + exc_info=(f.type, f.value, f.getTracebackObject()), + ) self._send_error_response(f, request) - async def _async_render(self, request: SynapseRequest) -> Optional[Tuple[int, Any]]: + async def _async_render( + self, request: "SynapseRequest" + ) -> Optional[Tuple[int, Any]]: """Delegates to `_async_render_<METHOD>` methods, or returns a 400 if no appropriate method exists. Can be overridden in sub classes for different routing. @@ -358,7 +365,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta): @abc.abstractmethod def _send_response( self, - request: SynapseRequest, + request: "SynapseRequest", code: int, response_object: Any, ) -> None: @@ -368,7 +375,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta): def _send_error_response( self, f: failure.Failure, - request: SynapseRequest, + request: "SynapseRequest", ) -> None: raise NotImplementedError() @@ -384,7 +391,7 @@ class DirectServeJsonResource(_AsyncResource): def _send_response( self, - request: SynapseRequest, + request: "SynapseRequest", code: int, response_object: Any, ) -> None: @@ -401,7 +408,7 @@ class DirectServeJsonResource(_AsyncResource): def _send_error_response( self, f: failure.Failure, - request: SynapseRequest, + request: "SynapseRequest", ) -> None: """Implements _AsyncResource._send_error_response""" return_json_error(f, request, None) @@ -473,7 +480,7 @@ class JsonResource(DirectServeJsonResource): ) def _get_handler_for_request( - self, request: SynapseRequest + self, request: "SynapseRequest" ) -> Tuple[ServletCallback, str, Dict[str, str]]: """Finds a callback method to handle the given request. @@ -503,7 +510,7 @@ class JsonResource(DirectServeJsonResource): # Huh. No one wanted to handle that? Fiiiiiine. raise UnrecognizedRequestError(code=404) - async def _async_render(self, request: SynapseRequest) -> Tuple[int, Any]: + async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]: callback, servlet_classname, group_dict = self._get_handler_for_request(request) request.is_render_cancellable = is_function_cancellable(callback) @@ -535,7 +542,7 @@ class JsonResource(DirectServeJsonResource): def _send_error_response( self, f: failure.Failure, - request: SynapseRequest, + request: "SynapseRequest", ) -> None: """Implements _AsyncResource._send_error_response""" return_json_error(f, request, self.hs.config) @@ -551,7 +558,7 @@ class DirectServeHtmlResource(_AsyncResource): def _send_response( self, - request: SynapseRequest, + request: "SynapseRequest", code: int, response_object: Any, ) -> None: @@ -565,7 +572,7 @@ class DirectServeHtmlResource(_AsyncResource): def _send_error_response( self, f: failure.Failure, - request: SynapseRequest, + request: "SynapseRequest", ) -> None: """Implements _AsyncResource._send_error_response""" return_html_error(f, request, self.ERROR_TEMPLATE) @@ -592,7 +599,7 @@ class UnrecognizedRequestResource(resource.Resource): errcode of M_UNRECOGNIZED. """ - def render(self, request: SynapseRequest) -> int: + def render(self, request: "SynapseRequest") -> int: f = failure.Failure(UnrecognizedRequestError(code=404)) return_json_error(f, request, None) # A response has already been sent but Twisted requires either NOT_DONE_YET @@ -622,7 +629,7 @@ class RootRedirect(resource.Resource): class OptionsResource(resource.Resource): """Responds to OPTION requests for itself and all children.""" - def render_OPTIONS(self, request: SynapseRequest) -> bytes: + def render_OPTIONS(self, request: "SynapseRequest") -> bytes: request.setResponseCode(204) request.setHeader(b"Content-Length", b"0") @@ -737,7 +744,7 @@ def _encode_json_bytes(json_object: object) -> bytes: def respond_with_json( - request: SynapseRequest, + request: "SynapseRequest", code: int, json_object: Any, send_cors: bool = False, @@ -787,7 +794,7 @@ def respond_with_json( def respond_with_json_bytes( - request: SynapseRequest, + request: "SynapseRequest", code: int, json_bytes: bytes, send_cors: bool = False, @@ -825,7 +832,7 @@ def respond_with_json_bytes( async def _async_write_json_to_request_in_thread( - request: SynapseRequest, + request: "SynapseRequest", json_encoder: Callable[[Any], bytes], json_object: Any, ) -> None: @@ -883,7 +890,7 @@ def _write_bytes_to_request(request: Request, bytes_to_write: bytes) -> None: _ByteProducer(request, bytes_generator) -def set_cors_headers(request: SynapseRequest) -> None: +def set_cors_headers(request: "SynapseRequest") -> None: """Set the CORS headers so that javascript running in a web browsers can use this API @@ -981,7 +988,7 @@ def set_clickjacking_protection_headers(request: Request) -> None: def respond_with_redirect( - request: SynapseRequest, url: bytes, statusCode: int = FOUND, cors: bool = False + request: "SynapseRequest", url: bytes, statusCode: int = FOUND, cors: bool = False ) -> None: """ Write a 302 (or other specified status code) response to the request, if it is still alive. diff --git a/synapse/http/site.py b/synapse/http/site.py index 5b5a7c1e59..a388d6cf7f 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -21,25 +21,29 @@ from zope.interface import implementer from twisted.internet.address import UNIXAddress from twisted.internet.defer import Deferred -from twisted.internet.interfaces import IAddress, IReactorTime +from twisted.internet.interfaces import IAddress from twisted.python.failure import Failure from twisted.web.http import HTTPChannel from twisted.web.resource import IResource, Resource -from twisted.web.server import Request, Site +from twisted.web.server import Request from synapse.config.server import ListenerConfig from synapse.http import get_request_user_agent, redact_uri +from synapse.http.proxy import ProxySite from synapse.http.request_metrics import RequestMetrics, requests_counter from synapse.logging.context import ( ContextRequest, LoggingContext, PreserveLoggingContext, ) -from synapse.types import Requester +from synapse.types import ISynapseReactor, Requester if TYPE_CHECKING: import opentracing + from synapse.server import HomeServer + + logger = logging.getLogger(__name__) _next_request_seq = 0 @@ -102,7 +106,7 @@ class SynapseRequest(Request): # A boolean indicating whether `render_deferred` should be cancelled if the # client disconnects early. Expected to be set by the coroutine started by # `Resource.render`, if rendering is asynchronous. - self.is_render_cancellable = False + self.is_render_cancellable: bool = False global _next_request_seq self.request_seq = _next_request_seq @@ -601,7 +605,7 @@ class _XForwardedForAddress: host: str -class SynapseSite(Site): +class SynapseSite(ProxySite): """ Synapse-specific twisted http Site @@ -623,7 +627,8 @@ class SynapseSite(Site): resource: IResource, server_version_string: str, max_request_body_size: int, - reactor: IReactorTime, + reactor: ISynapseReactor, + hs: "HomeServer", ): """ @@ -638,7 +643,11 @@ class SynapseSite(Site): dropping the connection reactor: reactor to be used to manage connection timeouts """ - Site.__init__(self, resource, reactor=reactor) + super().__init__( + resource=resource, + reactor=reactor, + hs=hs, + ) self.site_tag = site_tag self.reactor = reactor @@ -649,7 +658,9 @@ class SynapseSite(Site): request_id_header = config.http_options.request_id_header - self.experimental_cors_msc3886 = config.http_options.experimental_cors_msc3886 + self.experimental_cors_msc3886: bool = ( + config.http_options.experimental_cors_msc3886 + ) def request_factory(channel: HTTPChannel, queued: bool) -> Request: return request_class( diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 75217e3f45..be910128aa 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -1070,7 +1070,7 @@ def trace_servlet( tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER, tags.HTTP_METHOD: request.get_method(), tags.HTTP_URL: request.get_redacted_uri(), - tags.PEER_HOST_IPV6: request.getClientAddress().host, + tags.PEER_HOST_IPV6: request.get_client_ip_if_available(), } request_name = request.request_metrics.name @@ -1091,9 +1091,11 @@ def trace_servlet( # with JsonResource). scope.span.set_operation_name(request.request_metrics.name) + # Mypy seems to think that start_context.tag below can be Optional[str], but + # that doesn't appear to be correct and works in practice. request_tags[ SynapseTags.REQUEST_TAG - ] = request.request_metrics.start_context.tag + ] = request.request_metrics.start_context.tag # type: ignore[assignment] # set the tags *after* the servlet completes, in case it decided to # prioritise the span (tags will get dropped on unprioritised spans) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 67377c647b..990c079c81 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -375,7 +375,7 @@ class BulkPushRuleEvaluator: # _get_power_levels_and_sender_level in its call to get_user_power_level # (even for room V10.) notification_levels = power_levels.get("notifications", {}) - if not event.room_version.msc3667_int_only_power_levels: + if not event.room_version.enforce_int_power_levels: keys = list(notification_levels.keys()) for key in keys: level = notification_levels.get(key, SENTINEL) diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index 7ee07e4bee..a94a6e97c1 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -13,6 +13,7 @@ # limitations under the License. from typing import Dict +from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase from synapse.push.presentable_names import calculate_room_name, name_from_member_event from synapse.storage.controllers import StorageControllers @@ -49,7 +50,41 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) - async def get_context_for_event( storage: StorageControllers, ev: EventBase, user_id: str ) -> Dict[str, str]: - ctx = {} + ctx: Dict[str, str] = {} + + if ev.internal_metadata.outlier: + # We don't have state for outliers, so we can't compute the context + # except for invites and knocks. (Such events are known as 'out-of-band + # memberships' for the user). + if ev.type != EventTypes.Member: + return ctx + + # We might be able to pull out the display name for the sender straight + # from the membership event + event_display_name = ev.content.get("displayname") + if event_display_name and ev.state_key == ev.sender: + ctx["sender_display_name"] = event_display_name + + room_state = [] + if ev.content.get("membership") == Membership.INVITE: + room_state = ev.unsigned.get("invite_room_state", []) + elif ev.content.get("membership") == Membership.KNOCK: + room_state = ev.unsigned.get("knock_room_state", []) + + # Ideally we'd reuse the logic in `calculate_room_name`, but that gets + # complicated to handle partial events vs pulling events from the DB. + for state_dict in room_state: + type_tuple = (state_dict["type"], state_dict.get("state_key")) + if type_tuple == (EventTypes.Member, ev.sender): + display_name = state_dict["content"].get("displayname") + if display_name: + ctx["sender_display_name"] = display_name + elif type_tuple == (EventTypes.Name, ""): + room_name = state_dict["content"].get("name") + if room_name: + ctx["name"] = room_name + + return ctx room_state_ids = await storage.state.get_state_ids_for_event(ev.event_id) diff --git a/synapse/rest/client/register.py b/synapse/rest/client/register.py index d59669f0b6..77e3b91b79 100644 --- a/synapse/rest/client/register.py +++ b/synapse/rest/client/register.py @@ -462,9 +462,9 @@ class RegisterRestServlet(RestServlet): # the auth layer will store these in sessions. desired_username = None if "username" in body: - if not isinstance(body["username"], str) or len(body["username"]) > 512: - raise SynapseError(400, "Invalid username") desired_username = body["username"] + if not isinstance(desired_username, str) or len(desired_username) > 512: + raise SynapseError(400, "Invalid username") # fork off as soon as possible for ASes which have completely # different registration flows to normal users @@ -477,11 +477,6 @@ class RegisterRestServlet(RestServlet): "Appservice token must be provided when using a type of m.login.application_service", ) - # Set the desired user according to the AS API (which uses the - # 'user' key not 'username'). Since this is a new addition, we'll - # fallback to 'username' if they gave one. - desired_username = body.get("user", desired_username) - # XXX we should check that desired_username is valid. Currently # we give appservices carte blanche for any insanity in mxids, # because the IRC bridges rely on being able to register stupid @@ -489,7 +484,8 @@ class RegisterRestServlet(RestServlet): access_token = self.auth.get_access_token_from_request(request) - if not isinstance(desired_username, str): + # Desired username is either a string or None. + if desired_username is None: raise SynapseError(400, "Desired Username is missing or not a string") result = await self._do_appservice_registration( diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 951bd033f5..dc498001e4 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -1117,7 +1117,7 @@ class RoomRedactEventRestServlet(TransactionRestServlet): # Ensure the redacts property in the content matches the one provided in # the URL. room_version = await self._store.get_room_version(room_id) - if room_version.msc2176_redaction_rules: + if room_version.updated_redaction_rules: if "redacts" in content and content["redacts"] != event_id: raise SynapseError( 400, @@ -1151,7 +1151,7 @@ class RoomRedactEventRestServlet(TransactionRestServlet): "sender": requester.user.to_string(), } # Earlier room versions had a top-level redacts property. - if not room_version.msc2176_redaction_rules: + if not room_version.updated_redaction_rules: event_dict["redacts"] = event_id ( diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index ca8be8c80d..830658f328 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -2136,7 +2136,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): raise StoreError(400, "No create event in state") # Before MSC2175, the room creator was a separate field. - if not room_version.msc2175_implicit_room_creator: + if not room_version.implicit_room_creator: room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR) if not isinstance(room_creator, str): diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index b0a06baf4f..924022c95c 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -62,7 +62,6 @@ from synapse.types import ( get_domain_from_id, get_localpart_from_id, ) -from synapse.util.caches.descriptors import cached logger = logging.getLogger(__name__) @@ -771,9 +770,6 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): # This should be unreachable. raise Exception("Unrecognized database engine") - for p in profiles: - txn.call_after(self.get_user_in_directory.invalidate, (p.user_id,)) - async def add_users_who_share_private_room( self, room_id: str, user_id_tuples: Iterable[Tuple[str, str]] ) -> None: @@ -831,14 +827,12 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): txn.execute(f"{truncate} user_directory_search") txn.execute(f"{truncate} users_in_public_rooms") txn.execute(f"{truncate} users_who_share_private_rooms") - txn.call_after(self.get_user_in_directory.invalidate_all) await self.db_pool.runInteraction( "delete_all_from_user_dir", _delete_all_from_user_dir_txn ) - @cached() - async def get_user_in_directory(self, user_id: str) -> Optional[Mapping[str, str]]: + async def _get_user_in_directory(self, user_id: str) -> Optional[Mapping[str, str]]: return await self.db_pool.simple_select_one( table="user_directory", keyvalues={"user_id": user_id}, @@ -900,7 +894,6 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): table="users_who_share_private_rooms", keyvalues={"other_user_id": user_id}, ) - txn.call_after(self.get_user_in_directory.invalidate, (user_id,)) await self.db_pool.runInteraction( "remove_from_user_dir", _remove_from_user_dir_txn diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 095be070e0..fdfd465c8d 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -348,22 +348,15 @@ class EventID(DomainSpecificString): SIGIL = "$" -mxid_localpart_allowed_characters = set( - "_-./=" + string.ascii_lowercase + string.digits +MXID_LOCALPART_ALLOWED_CHARACTERS = set( + "_-./=+" + string.ascii_lowercase + string.digits ) -# MSC4007 adds the + to the allowed characters. -# -# TODO If this was accepted, update the SSO code to support this, see the callers -# of map_username_to_mxid_localpart. -extended_mxid_localpart_allowed_characters = mxid_localpart_allowed_characters | {"+"} # Guest user IDs are purely numeric. GUEST_USER_ID_PATTERN = re.compile(r"^\d+$") -def contains_invalid_mxid_characters( - localpart: str, use_extended_character_set: bool -) -> bool: +def contains_invalid_mxid_characters(localpart: str) -> bool: """Check for characters not allowed in an mxid or groupid localpart Args: @@ -374,12 +367,7 @@ def contains_invalid_mxid_characters( Returns: True if there are any naughty characters """ - allowed_characters = ( - extended_mxid_localpart_allowed_characters - if use_extended_character_set - else mxid_localpart_allowed_characters - ) - return any(c not in allowed_characters for c in localpart) + return any(c not in MXID_LOCALPART_ALLOWED_CHARACTERS for c in localpart) UPPER_CASE_PATTERN = re.compile(b"[A-Z_]") @@ -396,7 +384,7 @@ UPPER_CASE_PATTERN = re.compile(b"[A-Z_]") # bytes rather than strings # NON_MXID_CHARACTER_PATTERN = re.compile( - ("[^%s]" % (re.escape("".join(mxid_localpart_allowed_characters - {"="})),)).encode( + ("[^%s]" % (re.escape("".join(MXID_LOCALPART_ALLOWED_CHARACTERS - {"="})),)).encode( "ascii" ) ) |