diff --git a/synapse/storage/invite_rule.py b/synapse/storage/invite_rule.py
new file mode 100644
index 0000000000..b9d9d1eb62
--- /dev/null
+++ b/synapse/storage/invite_rule.py
@@ -0,0 +1,110 @@
+import logging
+from enum import Enum
+from typing import Optional, Pattern
+
+from matrix_common.regex import glob_to_regex
+
+from synapse.types import JsonMapping, UserID
+
+logger = logging.getLogger(__name__)
+
+
+class InviteRule(Enum):
+ """Enum to define the action taken when an invite matches a rule."""
+
+ ALLOW = "allow"
+ BLOCK = "block"
+ IGNORE = "ignore"
+
+
+class InviteRulesConfig:
+ """Class to determine if a given user permits an invite from another user, and the action to take."""
+
+ def __init__(self, account_data: Optional[JsonMapping]):
+ self.allowed_users: list[Pattern[str]] = []
+ self.ignored_users: list[Pattern[str]] = []
+ self.blocked_users: list[Pattern[str]] = []
+
+ self.allowed_servers: list[Pattern[str]] = []
+ self.ignored_servers: list[Pattern[str]] = []
+ self.blocked_servers: list[Pattern[str]] = []
+
+ def process_field(
+ values: Optional[list[str]],
+ ruleset: list[Pattern[str]],
+ rule: InviteRule,
+ ) -> None:
+ if isinstance(values, list):
+ for value in values:
+ if isinstance(value, str) and len(value) > 0:
+ # User IDs cannot exceed 255 bytes. Don't process large, potentially
+ # expensive glob patterns.
+ if len(value) > 255:
+ logger.debug(
+ "Ignoring invite config glob pattern that is >255 bytes: {value}"
+ )
+ continue
+
+ try:
+ ruleset.append(glob_to_regex(value))
+ except Exception as e:
+ # If for whatever reason we can't process this, just ignore it.
+ logger.debug(
+ f"Could not process '{value}' field of invite rule config, ignoring: {e}"
+ )
+
+ if account_data:
+ process_field(
+ account_data.get("allowed_users"), self.allowed_users, InviteRule.ALLOW
+ )
+ process_field(
+ account_data.get("ignored_users"), self.ignored_users, InviteRule.IGNORE
+ )
+ process_field(
+ account_data.get("blocked_users"), self.blocked_users, InviteRule.BLOCK
+ )
+ process_field(
+ account_data.get("allowed_servers"),
+ self.allowed_servers,
+ InviteRule.ALLOW,
+ )
+ process_field(
+ account_data.get("ignored_servers"),
+ self.ignored_servers,
+ InviteRule.IGNORE,
+ )
+ process_field(
+ account_data.get("blocked_servers"),
+ self.blocked_servers,
+ InviteRule.BLOCK,
+ )
+
+ def get_invite_rule(self, user_id: str) -> InviteRule:
+ """Get the invite rule that matches this user. Will return InviteRule.ALLOW if no rules match
+
+ Args:
+ user_id: The user ID of the inviting user.
+
+ """
+ user = UserID.from_string(user_id)
+ # The order here is important. We always process user rules before server rules
+ # and we always process in the order of Allow, Ignore, Block.
+ for patterns, rule in [
+ (self.allowed_users, InviteRule.ALLOW),
+ (self.ignored_users, InviteRule.IGNORE),
+ (self.blocked_users, InviteRule.BLOCK),
+ ]:
+ for regex in patterns:
+ if regex.match(user_id):
+ return rule
+
+ for patterns, rule in [
+ (self.allowed_servers, InviteRule.ALLOW),
+ (self.ignored_servers, InviteRule.IGNORE),
+ (self.blocked_servers, InviteRule.BLOCK),
+ ]:
+ for regex in patterns:
+ if regex.match(user.domain):
+ return rule
+
+ return InviteRule.ALLOW
|