diff --git a/synapse/third_party_rules/access_rules.py b/synapse/third_party_rules/access_rules.py
index 2c9155d15c..d4e57316de 100644
--- a/synapse/third_party_rules/access_rules.py
+++ b/synapse/third_party_rules/access_rules.py
@@ -12,25 +12,31 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import email.utils
+from typing import Dict, List, Optional, Tuple
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, Membership, RoomCreationPreset
from synapse.api.errors import SynapseError
from synapse.config._base import ConfigError
-from synapse.types import get_domain_from_id
+from synapse.events import EventBase
+from synapse.http.client import SimpleHttpClient
+from synapse.types import Requester, StateMap, get_domain_from_id
ACCESS_RULES_TYPE = "im.vector.room.access_rules"
-ACCESS_RULE_RESTRICTED = "restricted"
-ACCESS_RULE_UNRESTRICTED = "unrestricted"
-ACCESS_RULE_DIRECT = "direct"
+
+
+class AccessRules:
+ DIRECT = "direct"
+ RESTRICTED = "restricted"
+ UNRESTRICTED = "unrestricted"
+
VALID_ACCESS_RULES = (
- ACCESS_RULE_DIRECT,
- ACCESS_RULE_RESTRICTED,
- ACCESS_RULE_UNRESTRICTED,
+ AccessRules.DIRECT,
+ AccessRules.RESTRICTED,
+ AccessRules.UNRESTRICTED,
)
# Rules to which we need to apply the power levels restrictions.
@@ -44,7 +50,7 @@ VALID_ACCESS_RULES = (
# * the default power level for users (users_default) being set to anything other than 0.
# * a non-default power level being assigned to any user which would be forbidden from
# joining a restricted room.
-RULES_WITH_RESTRICTED_POWER_LEVELS = (ACCESS_RULE_UNRESTRICTED,)
+RULES_WITH_RESTRICTED_POWER_LEVELS = (AccessRules.UNRESTRICTED,)
class RoomAccessRules(object):
@@ -68,7 +74,7 @@ class RoomAccessRules(object):
Don't forget to consider if you can invite users from your own domain.
"""
- def __init__(self, config, http_client):
+ def __init__(self, config: Dict, http_client: SimpleHttpClient):
self.http_client = http_client
self.id_server = config["id_server"]
@@ -78,18 +84,42 @@ class RoomAccessRules(object):
)
@staticmethod
- def parse_config(config):
- if "id_server" in config:
- return config
- else:
+ def parse_config(config: Dict) -> Dict:
+ """Parses and validates the options specified in the homeserver config.
+
+ Args:
+ config: The config dict.
+
+ Returns:
+ The config dict.
+
+ Raises:
+ ConfigError: If there was an issue with the provided module configuration.
+ """
+ if "id_server" not in config:
raise ConfigError("No IS for event rules TchapEventRules")
- def on_create_room(self, requester, config, is_requester_admin) -> bool:
- """Implements synapse.events.ThirdPartyEventRules.on_create_room
+ return config
+
+ def on_create_room(
+ self, requester: Requester, config: Dict, is_requester_admin: bool,
+ ) -> bool:
+ """Implements synapse.events.ThirdPartyEventRules.on_create_room.
Checks if a im.vector.room.access_rules event is being set during room creation.
If yes, make sure the event is correct. Otherwise, append an event with the
default rule to the initial state.
+
+ Args:
+ requester: The user who is making the createRoom request.
+ config: The createRoom config dict provided by the user.
+ is_requester_admin: Whether the requester is a Synapse admin.
+
+ Returns:
+ Whether the request is allowed.
+
+ Raises:
+ SynapseError: If the createRoom config dict is invalid or its contents blocked.
"""
is_direct = config.get("is_direct")
preset = config.get("preset")
@@ -110,9 +140,8 @@ class RoomAccessRules(object):
if access_rule not in VALID_ACCESS_RULES:
raise SynapseError(400, "Invalid access rule")
- # Make sure the rule is "direct" if the room is a direct chat.
- if (is_direct and access_rule != ACCESS_RULE_DIRECT) or (
- access_rule == ACCESS_RULE_DIRECT and not is_direct
+ if (is_direct and access_rule != AccessRules.DIRECT) or (
+ access_rule == AccessRules.DIRECT and not is_direct
):
raise SynapseError(400, "Invalid access rule")
@@ -123,12 +152,12 @@ class RoomAccessRules(object):
# If there's no access rules event in the initial state, create one with the
# default setting.
if is_direct:
- default_rule = ACCESS_RULE_DIRECT
+ default_rule = AccessRules.DIRECT
else:
# If the default value for non-direct chat changes, we should make another
# case here for rooms created with either a "public" join_rule or the
# "public_chat" preset to make sure those keep defaulting to "restricted"
- default_rule = ACCESS_RULE_RESTRICTED
+ default_rule = AccessRules.RESTRICTED
if not config.get("initial_state"):
config["initial_state"] = []
@@ -148,7 +177,7 @@ class RoomAccessRules(object):
# a "public" join rule, the access rule must be "restricted").
if (
join_rule == JoinRules.PUBLIC or preset == RoomCreationPreset.PUBLIC_CHAT
- ) and access_rule != ACCESS_RULE_RESTRICTED:
+ ) and access_rule != AccessRules.RESTRICTED:
raise SynapseError(400, "Invalid access rule")
# Check if the creator can override values for the power levels.
@@ -170,30 +199,41 @@ class RoomAccessRules(object):
return True
@defer.inlineCallbacks
- def check_threepid_can_be_invited(self, medium, address, state_events):
- """Implements synapse.events.ThirdPartyEventRules.check_threepid_can_be_invited
+ def check_threepid_can_be_invited(
+ self, medium: str, address: str, state_events: StateMap[EventBase],
+ ) -> bool:
+ """Implements synapse.events.ThirdPartyEventRules.check_threepid_can_be_invited.
Check if a threepid can be invited to the room via a 3PID invite given the current
rules and the threepid's address, by retrieving the HS it's mapped to from the
configured identity server, and checking if we can invite users from it.
+
+ Args:
+ medium: The medium of the threepid.
+ address: The address of the threepid.
+ state_events: A dict mapping (event type, state key) to state event.
+ State events in the room the threepid is being invited to.
+
+ Returns:
+ Whether the threepid invite is allowed.
"""
rule = self._get_rule_from_state(state_events)
if medium != "email":
- defer.returnValue(False)
+ return False
- if rule != ACCESS_RULE_RESTRICTED:
+ if rule != AccessRules.RESTRICTED:
# Only "restricted" requires filtering 3PID invites. We don't need to do
# anything for "direct" here, because only "restricted" requires filtering
# based on the HS the address is mapped to.
- defer.returnValue(True)
+ return True
parsed_address = email.utils.parseaddr(address)[1]
if parsed_address != address:
# Avoid reproducing the security issue described here:
# https://matrix.org/blog/2019/04/18/security-update-sydent-1-0-2
# It's probably not worth it but let's just be overly safe here.
- defer.returnValue(False)
+ return False
# Get the HS this address belongs to from the identity server.
res = yield self.http_client.get_json(
@@ -203,17 +243,27 @@ class RoomAccessRules(object):
# Look for a domain that's not forbidden from being invited.
if not res.get("hs"):
- defer.returnValue(False)
+ return False
if res.get("hs") in self.domains_forbidden_when_restricted:
- defer.returnValue(False)
+ return False
- defer.returnValue(True)
+ return True
- def check_event_allowed(self, event, state_events):
- """Implements synapse.events.ThirdPartyEventRules.check_event_allowed
+ def check_event_allowed(
+ self, event: EventBase, state_events: StateMap[EventBase],
+ ) -> bool:
+ """Implements synapse.events.ThirdPartyEventRules.check_event_allowed.
Checks the event's type and the current rule and calls the right function to
determine whether the event can be allowed.
+
+ Args:
+ event: The event to check.
+ state_events: A dict mapping (event type, state key) to state event.
+ State events in the room the event originated from.
+
+ Returns:
+ True if the event can be allowed, False otherwise.
"""
if event.type == ACCESS_RULES_TYPE:
return self._on_rules_change(event, state_events)
@@ -241,16 +291,19 @@ class RoomAccessRules(object):
return True
- def _on_rules_change(self, event, state_events):
+ def _on_rules_change(
+ self, event: EventBase, state_events: StateMap[EventBase],
+ ) -> bool:
"""Implement the checks and behaviour specified on allowing or forbidding a new
im.vector.room.access_rules event.
Args:
- event (synapse.events.EventBase): The event to check.
- state_events (dict[tuple[event type, state key], EventBase]): The state of the
- room before the event was sent.
+ event: The event to check.
+ state_events: A dict mapping (event type, state key) to state event.
+ State events in the room before the event was sent.
+
Returns:
- bool, True if the event can be allowed, False otherwise.
+ True if the event can be allowed, False otherwise.
"""
new_rule = event.content.get("rule")
@@ -261,11 +314,11 @@ class RoomAccessRules(object):
# We must not allow rooms with the "public" join rule to be given any other access
# rule than "restricted".
join_rule = self._get_join_rule_from_state(state_events)
- if join_rule == JoinRules.PUBLIC and new_rule != ACCESS_RULE_RESTRICTED:
+ if join_rule == JoinRules.PUBLIC and new_rule != AccessRules.RESTRICTED:
return False
# Make sure we don't apply "direct" if the room has more than two members.
- if new_rule == ACCESS_RULE_DIRECT:
+ if new_rule == AccessRules.DIRECT:
existing_members, threepid_tokens = self._get_members_and_tokens_from_state(
state_events
)
@@ -283,28 +336,30 @@ class RoomAccessRules(object):
prev_rule = prev_rules_event.content.get("rule")
# Currently, we can only go from "restricted" to "unrestricted".
- if prev_rule == ACCESS_RULE_RESTRICTED and new_rule == ACCESS_RULE_UNRESTRICTED:
- return True
-
- return False
+ return (
+ prev_rule == AccessRules.RESTRICTED and new_rule == AccessRules.UNRESTRICTED
+ )
- def _on_membership_or_invite(self, event, rule, state_events):
+ def _on_membership_or_invite(
+ self, event: EventBase, rule: str, state_events: StateMap[EventBase],
+ ) -> bool:
"""Applies the correct rule for incoming m.room.member and
m.room.third_party_invite events.
Args:
- event (synapse.events.EventBase): The event to check.
- rule (str): The name of the rule to apply.
- state_events (dict[tuple[event type, state key], EventBase]): The state of the
- room before the event was sent.
+ event: The event to check.
+ rule: The name of the rule to apply.
+ state_events: A dict mapping (event type, state key) to state event.
+ The state of the room before the event was sent.
+
Returns:
- bool, True if the event can be allowed, False otherwise.
+ True if the event can be allowed, False otherwise.
"""
- if rule == ACCESS_RULE_RESTRICTED:
+ if rule == AccessRules.RESTRICTED:
ret = self._on_membership_or_invite_restricted(event)
- elif rule == ACCESS_RULE_UNRESTRICTED:
+ elif rule == AccessRules.UNRESTRICTED:
ret = self._on_membership_or_invite_unrestricted()
- elif rule == ACCESS_RULE_DIRECT:
+ elif rule == AccessRules.DIRECT:
ret = self._on_membership_or_invite_direct(event, state_events)
else:
# We currently apply the default (restricted) if we don't know the rule, we
@@ -313,16 +368,17 @@ class RoomAccessRules(object):
return ret
- def _on_membership_or_invite_restricted(self, event):
+ def _on_membership_or_invite_restricted(self, event: EventBase) -> bool:
"""Implements the checks and behaviour specified for the "restricted" rule.
"restricted" currently means that users can only invite users if their server is
included in a limited list of domains.
Args:
- event (synapse.events.EventBase): The event to check.
+ event: The event to check.
+
Returns:
- bool, True if the event can be allowed, False otherwise.
+ True if the event can be allowed, False otherwise.
"""
# We're not applying the rules on m.room.third_party_member events here because
# the filtering on threepids is done in check_threepid_can_be_invited, which is
@@ -340,29 +396,31 @@ class RoomAccessRules(object):
invitee_domain = get_domain_from_id(event.state_key)
return invitee_domain not in self.domains_forbidden_when_restricted
- def _on_membership_or_invite_unrestricted(self):
+ def _on_membership_or_invite_unrestricted(self) -> bool:
"""Implements the checks and behaviour specified for the "unrestricted" rule.
"unrestricted" currently means that every event is allowed.
Returns:
- bool, True if the event can be allowed, False otherwise.
+ True if the event can be allowed, False otherwise.
"""
return True
- def _on_membership_or_invite_direct(self, event, state_events):
+ def _on_membership_or_invite_direct(
+ self, event: EventBase, state_events: StateMap[EventBase],
+ ) -> bool:
"""Implements the checks and behaviour specified for the "direct" rule.
"direct" currently means that no member is allowed apart from the two initial
- members the room was created for (i.e. the room's creator and their first
- invitee).
+ members the room was created for (i.e. the room's creator and their first invitee).
Args:
- event (synapse.events.EventBase): The event to check.
- state_events (dict[tuple[event type, state key], EventBase]): The state of the
- room before the event was sent.
+ event: The event to check.
+ state_events: A dict mapping (event type, state key) to state event.
+ The state of the room before the event was sent.
+
Returns:
- bool, True if the event can be allowed, False otherwise.
+ True if the event can be allowed, False otherwise.
"""
# Get the room memberships and 3PID invite tokens from the room's state.
existing_members, threepid_tokens = self._get_members_and_tokens_from_state(
@@ -406,14 +464,11 @@ class RoomAccessRules(object):
is_from_threepid_invite = self._is_invite_from_threepid(
event, threepid_tokens[0]
)
- if is_from_threepid_invite or target == existing_members[0]:
- return True
-
- return False
+ return is_from_threepid_invite or target == existing_members[0]
return True
- def _is_power_level_content_allowed(self, content, access_rule):
+ def _is_power_level_content_allowed(self, content: Dict, access_rule: str) -> bool:
"""Check if a given power levels event is permitted under the given access rule.
It shouldn't be allowed if it either changes the default PL to a non-0 value or
@@ -421,10 +476,11 @@ class RoomAccessRules(object):
under a more restrictive access rule.
Args:
- content (dict[]): The content of the m.room.power_levels event to check.
- access_rule (str): The access rule in place in this room.
+ content: The content of the m.room.power_levels event to check.
+ access_rule: The access rule in place in this room.
+
Returns:
- bool, True if the event can be allowed, False otherwise.
+ Whether the content of the power levels event is valid.
"""
# Check if we need to apply the restrictions with the current rule.
if access_rule not in RULES_WITH_RESTRICTED_POWER_LEVELS:
@@ -448,10 +504,11 @@ class RoomAccessRules(object):
return True
- def _on_join_rule_change(self, event, rule):
+ def _on_join_rule_change(self, event: EventBase, rule: str) -> bool:
"""Check whether a join rule change is allowed. A join rule change is always
allowed unless the new join rule is "public" and the current access rule isn't
"restricted".
+
The rationale is that external users (those whose server would be denied access
to rooms enforcing the "restricted" access rule) should always rely on non-
external users for access to rooms, therefore they shouldn't be able to access
@@ -465,100 +522,107 @@ class RoomAccessRules(object):
that in mind if we need to change the default access rule in the future.
Args:
- event (synapse.events.EventBase): The event to check.
- rule (str): The name of the rule to apply.
+ event: The event to check.
+ rule: The name of the rule to apply.
+
Returns:
- bool, True if the event can be allowed, False otherwise.
+ Whether the change is allowed.
"""
if event.content.get("join_rule") == JoinRules.PUBLIC:
- return rule == ACCESS_RULE_RESTRICTED
+ return rule == AccessRules.RESTRICTED
return True
- def _on_room_avatar_change(self, event, rule):
+ def _on_room_avatar_change(self, event: EventBase, rule: str) -> bool:
"""Check whether a change of room avatar is allowed.
The current rule is to forbid such a change in direct chats but allow it
everywhere else.
Args:
- event (synapse.events.EventBase): The event to check.
- rule (str): The name of the rule to apply.
+ event: The event to check.
+ rule: The name of the rule to apply.
+
Returns:
- bool, True if the event can be allowed, False otherwise.
+ True if the event can be allowed, False otherwise.
"""
- return rule != ACCESS_RULE_DIRECT
+ return rule != AccessRules.DIRECT
- def _on_room_name_change(self, event, rule):
+ def _on_room_name_change(self, event: EventBase, rule: str) -> bool:
"""Check whether a change of room name is allowed.
The current rule is to forbid such a change in direct chats but allow it
everywhere else.
Args:
- event (synapse.events.EventBase): The event to check.
- rule (str): The name of the rule to apply.
+ event: The event to check.
+ rule: The name of the rule to apply.
+
Returns:
- bool, True if the event can be allowed, False otherwise.
+ True if the event can be allowed, False otherwise.
"""
- return rule != ACCESS_RULE_DIRECT
+ return rule != AccessRules.DIRECT
- def _on_room_topic_change(self, event, rule):
+ def _on_room_topic_change(self, event: EventBase, rule: str) -> bool:
"""Check whether a change of room topic is allowed.
The current rule is to forbid such a change in direct chats but allow it
everywhere else.
Args:
- event (synapse.events.EventBase): The event to check.
- rule (str): The name of the rule to apply.
+ event: The event to check.
+ rule: The name of the rule to apply.
+
Returns:
- bool, True if the event can be allowed, False otherwise.
+ True if the event can be allowed, False otherwise.
"""
- return rule != ACCESS_RULE_DIRECT
+ return rule != AccessRules.DIRECT
@staticmethod
- def _get_rule_from_state(state_events):
+ def _get_rule_from_state(state_events: StateMap[EventBase]) -> Optional[str]:
"""Extract the rule to be applied from the given set of state events.
Args:
- state_events (dict[tuple[event type, state key], EventBase]): The set of state
- events.
+ state_events: A dict mapping (event type, state key) to state event.
+
Returns:
- str, the name of the rule (either "direct", "restricted" or "unrestricted")
+ The name of the rule (either "direct", "restricted" or "unrestricted") if found,
+ else None.
"""
access_rules = state_events.get((ACCESS_RULES_TYPE, ""))
if access_rules is None:
- rule = ACCESS_RULE_RESTRICTED
- else:
- rule = access_rules.content.get("rule")
- return rule
+ return AccessRules.RESTRICTED
+
+ return access_rules.content.get("rule")
@staticmethod
- def _get_join_rule_from_state(state_events):
+ def _get_join_rule_from_state(state_events: StateMap[EventBase]) -> Optional[str]:
"""Extract the room's join rule from the given set of state events.
Args:
state_events (dict[tuple[event type, state key], EventBase]): The set of state
events.
+
Returns:
- str, the name of the join rule (either "public", or "invite")
+ The name of the join rule (either "public", or "invite") if found, else None.
"""
join_rule_event = state_events.get((EventTypes.JoinRules, ""))
if join_rule_event is None:
return None
+
return join_rule_event.content.get("join_rule")
@staticmethod
- def _get_members_and_tokens_from_state(state_events):
- """Retrieves from a list of state events the list of users that have a
- m.room.member event in the room, and the tokens of 3PID invites in the room.
+ def _get_members_and_tokens_from_state(
+ state_events: StateMap[EventBase],
+ ) -> Tuple[List[str], List[str]]:
+ """Retrieves the list of users that have a m.room.member event in the room,
+ as well as 3PID invites tokens in the room.
Args:
- state_events (dict[tuple[event type, state key], EventBase]): The set of state
- events.
+ state_events: A dict mapping (event type, state key) to state event.
+
Returns:
- existing_members (list[str]): List of targets of the m.room.member events in
- the state.
- threepid_invite_tokens (list[str]): List of tokens of the 3PID invites in the
- state.
+ A tuple containing the:
+ * targets of the m.room.member events in the state.
+ * 3PID invite tokens in the state.
"""
existing_members = []
threepid_invite_tokens = []
@@ -572,12 +636,15 @@ class RoomAccessRules(object):
return existing_members, threepid_invite_tokens
@staticmethod
- def _is_invite_from_threepid(invite, threepid_invite_token):
+ def _is_invite_from_threepid(invite: EventBase, threepid_invite_token: str) -> bool:
"""Checks whether the given invite follows the given 3PID invite.
Args:
- invite (EventBase): The m.room.member event with "invite" membership.
- threepid_invite_token (str): The state key from the 3PID invite.
+ invite: The m.room.member event with "invite" membership.
+ threepid_invite_token: The state key from the 3PID invite.
+
+ Returns:
+ Whether the invite is due to the given 3PID invite.
"""
token = (
invite.content.get("third_party_invite", {})
|