1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
import logging
from enum import Enum
from typing import Optional, Pattern
from matrix_common.regex import glob_to_regex
from synapse.types import JsonMapping, UserID
logger = logging.getLogger(__name__)
class InviteRule(Enum):
"""Enum to define the action taken when an invite matches a rule."""
ALLOW = "allow"
BLOCK = "block"
IGNORE = "ignore"
class InviteRulesConfig:
"""Class to determine if a given user permits an invite from another user, and the action to take."""
def __init__(self, account_data: Optional[JsonMapping]):
self.allowed_users: list[Pattern[str]] = []
self.ignored_users: list[Pattern[str]] = []
self.blocked_users: list[Pattern[str]] = []
self.allowed_servers: list[Pattern[str]] = []
self.ignored_servers: list[Pattern[str]] = []
self.blocked_servers: list[Pattern[str]] = []
def process_field(
values: Optional[list[str]],
ruleset: list[Pattern[str]],
rule: InviteRule,
) -> None:
if isinstance(values, list):
for value in values:
if isinstance(value, str) and len(value) > 0:
# User IDs cannot exceed 255 bytes. Don't process large, potentially
# expensive glob patterns.
if len(value) > 255:
logger.debug(
"Ignoring invite config glob pattern that is >255 bytes: {value}"
)
continue
try:
ruleset.append(glob_to_regex(value))
except Exception as e:
# If for whatever reason we can't process this, just ignore it.
logger.debug(
f"Could not process '{value}' field of invite rule config, ignoring: {e}"
)
if account_data:
process_field(
account_data.get("allowed_users"), self.allowed_users, InviteRule.ALLOW
)
process_field(
account_data.get("ignored_users"), self.ignored_users, InviteRule.IGNORE
)
process_field(
account_data.get("blocked_users"), self.blocked_users, InviteRule.BLOCK
)
process_field(
account_data.get("allowed_servers"),
self.allowed_servers,
InviteRule.ALLOW,
)
process_field(
account_data.get("ignored_servers"),
self.ignored_servers,
InviteRule.IGNORE,
)
process_field(
account_data.get("blocked_servers"),
self.blocked_servers,
InviteRule.BLOCK,
)
def get_invite_rule(self, user_id: str) -> InviteRule:
"""Get the invite rule that matches this user. Will return InviteRule.ALLOW if no rules match
Args:
user_id: The user ID of the inviting user.
"""
user = UserID.from_string(user_id)
# The order here is important. We always process user rules before server rules
# and we always process in the order of Allow, Ignore, Block.
for patterns, rule in [
(self.allowed_users, InviteRule.ALLOW),
(self.ignored_users, InviteRule.IGNORE),
(self.blocked_users, InviteRule.BLOCK),
]:
for regex in patterns:
if regex.match(user_id):
return rule
for patterns, rule in [
(self.allowed_servers, InviteRule.ALLOW),
(self.ignored_servers, InviteRule.IGNORE),
(self.blocked_servers, InviteRule.BLOCK),
]:
for regex in patterns:
if regex.match(user.domain):
return rule
return InviteRule.ALLOW
|