summary refs log tree commit diff
path: root/synapse/storage/invite_rule.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/storage/invite_rule.py110
1 files changed, 110 insertions, 0 deletions
diff --git a/synapse/storage/invite_rule.py b/synapse/storage/invite_rule.py
new file mode 100644

index 0000000000..b9d9d1eb62 --- /dev/null +++ b/synapse/storage/invite_rule.py
@@ -0,0 +1,110 @@ +import logging +from enum import Enum +from typing import Optional, Pattern + +from matrix_common.regex import glob_to_regex + +from synapse.types import JsonMapping, UserID + +logger = logging.getLogger(__name__) + + +class InviteRule(Enum): + """Enum to define the action taken when an invite matches a rule.""" + + ALLOW = "allow" + BLOCK = "block" + IGNORE = "ignore" + + +class InviteRulesConfig: + """Class to determine if a given user permits an invite from another user, and the action to take.""" + + def __init__(self, account_data: Optional[JsonMapping]): + self.allowed_users: list[Pattern[str]] = [] + self.ignored_users: list[Pattern[str]] = [] + self.blocked_users: list[Pattern[str]] = [] + + self.allowed_servers: list[Pattern[str]] = [] + self.ignored_servers: list[Pattern[str]] = [] + self.blocked_servers: list[Pattern[str]] = [] + + def process_field( + values: Optional[list[str]], + ruleset: list[Pattern[str]], + rule: InviteRule, + ) -> None: + if isinstance(values, list): + for value in values: + if isinstance(value, str) and len(value) > 0: + # User IDs cannot exceed 255 bytes. Don't process large, potentially + # expensive glob patterns. + if len(value) > 255: + logger.debug( + "Ignoring invite config glob pattern that is >255 bytes: {value}" + ) + continue + + try: + ruleset.append(glob_to_regex(value)) + except Exception as e: + # If for whatever reason we can't process this, just ignore it. + logger.debug( + f"Could not process '{value}' field of invite rule config, ignoring: {e}" + ) + + if account_data: + process_field( + account_data.get("allowed_users"), self.allowed_users, InviteRule.ALLOW + ) + process_field( + account_data.get("ignored_users"), self.ignored_users, InviteRule.IGNORE + ) + process_field( + account_data.get("blocked_users"), self.blocked_users, InviteRule.BLOCK + ) + process_field( + account_data.get("allowed_servers"), + self.allowed_servers, + InviteRule.ALLOW, + ) + process_field( + account_data.get("ignored_servers"), + self.ignored_servers, + InviteRule.IGNORE, + ) + process_field( + account_data.get("blocked_servers"), + self.blocked_servers, + InviteRule.BLOCK, + ) + + def get_invite_rule(self, user_id: str) -> InviteRule: + """Get the invite rule that matches this user. Will return InviteRule.ALLOW if no rules match + + Args: + user_id: The user ID of the inviting user. + + """ + user = UserID.from_string(user_id) + # The order here is important. We always process user rules before server rules + # and we always process in the order of Allow, Ignore, Block. + for patterns, rule in [ + (self.allowed_users, InviteRule.ALLOW), + (self.ignored_users, InviteRule.IGNORE), + (self.blocked_users, InviteRule.BLOCK), + ]: + for regex in patterns: + if regex.match(user_id): + return rule + + for patterns, rule in [ + (self.allowed_servers, InviteRule.ALLOW), + (self.ignored_servers, InviteRule.IGNORE), + (self.blocked_servers, InviteRule.BLOCK), + ]: + for regex in patterns: + if regex.match(user.domain): + return rule + + return InviteRule.ALLOW