summary refs log tree commit diff
path: root/synapse/storage/invite_rule.py
blob: b9d9d1eb6278f88b0bcf5ba0b6475ef734b1bd4a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import logging
from enum import Enum
from typing import Optional, Pattern

from matrix_common.regex import glob_to_regex

from synapse.types import JsonMapping, UserID

logger = logging.getLogger(__name__)


class InviteRule(Enum):
    """Enum to define the action taken when an invite matches a rule."""

    ALLOW = "allow"
    BLOCK = "block"
    IGNORE = "ignore"


class InviteRulesConfig:
    """Class to determine if a given user permits an invite from another user, and the action to take."""

    def __init__(self, account_data: Optional[JsonMapping]):
        self.allowed_users: list[Pattern[str]] = []
        self.ignored_users: list[Pattern[str]] = []
        self.blocked_users: list[Pattern[str]] = []

        self.allowed_servers: list[Pattern[str]] = []
        self.ignored_servers: list[Pattern[str]] = []
        self.blocked_servers: list[Pattern[str]] = []

        def process_field(
            values: Optional[list[str]],
            ruleset: list[Pattern[str]],
            rule: InviteRule,
        ) -> None:
            if isinstance(values, list):
                for value in values:
                    if isinstance(value, str) and len(value) > 0:
                        # User IDs cannot exceed 255 bytes. Don't process large, potentially
                        # expensive glob patterns.
                        if len(value) > 255:
                            logger.debug(
                                "Ignoring invite config glob pattern that is >255 bytes: {value}"
                            )
                            continue

                        try:
                            ruleset.append(glob_to_regex(value))
                        except Exception as e:
                            # If for whatever reason we can't process this, just ignore it.
                            logger.debug(
                                f"Could not process '{value}' field of invite rule config, ignoring: {e}"
                            )

        if account_data:
            process_field(
                account_data.get("allowed_users"), self.allowed_users, InviteRule.ALLOW
            )
            process_field(
                account_data.get("ignored_users"), self.ignored_users, InviteRule.IGNORE
            )
            process_field(
                account_data.get("blocked_users"), self.blocked_users, InviteRule.BLOCK
            )
            process_field(
                account_data.get("allowed_servers"),
                self.allowed_servers,
                InviteRule.ALLOW,
            )
            process_field(
                account_data.get("ignored_servers"),
                self.ignored_servers,
                InviteRule.IGNORE,
            )
            process_field(
                account_data.get("blocked_servers"),
                self.blocked_servers,
                InviteRule.BLOCK,
            )

    def get_invite_rule(self, user_id: str) -> InviteRule:
        """Get the invite rule that matches this user. Will return InviteRule.ALLOW if no rules match

        Args:
            user_id: The user ID of the inviting user.

        """
        user = UserID.from_string(user_id)
        # The order here is important. We always process user rules before server rules
        # and we always process in the order of Allow, Ignore, Block.
        for patterns, rule in [
            (self.allowed_users, InviteRule.ALLOW),
            (self.ignored_users, InviteRule.IGNORE),
            (self.blocked_users, InviteRule.BLOCK),
        ]:
            for regex in patterns:
                if regex.match(user_id):
                    return rule

        for patterns, rule in [
            (self.allowed_servers, InviteRule.ALLOW),
            (self.ignored_servers, InviteRule.IGNORE),
            (self.blocked_servers, InviteRule.BLOCK),
        ]:
            for regex in patterns:
                if regex.match(user.domain):
                    return rule

        return InviteRule.ALLOW