diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index f9915e5a3f..ec8e770430 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -29,10 +29,8 @@ from typing import (
Union,
)
-from matrix_common.regex import glob_to_regex
from prometheus_client import Counter, Gauge, Histogram
-from twisted.internet.abstract import isIPAddress
from twisted.python import failure
from synapse.api.constants import (
@@ -1324,75 +1322,13 @@ class FederationServer(FederationBase):
Raises:
AuthError if the server does not match the ACL
"""
- acl_event = await self._storage_controllers.state.get_current_state_event(
- room_id, EventTypes.ServerACL, ""
+ server_acl_evaluator = (
+ await self._storage_controllers.state.get_server_acl_for_room(room_id)
)
- if not acl_event or server_matches_acl_event(server_name, acl_event):
- return
-
- raise AuthError(code=403, msg="Server is banned from room")
-
-
-def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool:
- """Check if the given server is allowed by the ACL event
-
- Args:
- server_name: name of server, without any port part
- acl_event: m.room.server_acl event
-
- Returns:
- True if this server is allowed by the ACLs
- """
- logger.debug("Checking %s against acl %s", server_name, acl_event.content)
-
- # first of all, check if literal IPs are blocked, and if so, whether the
- # server name is a literal IP
- allow_ip_literals = acl_event.content.get("allow_ip_literals", True)
- if not isinstance(allow_ip_literals, bool):
- logger.warning("Ignoring non-bool allow_ip_literals flag")
- allow_ip_literals = True
- if not allow_ip_literals:
- # check for ipv6 literals. These start with '['.
- if server_name[0] == "[":
- return False
-
- # check for ipv4 literals. We can just lift the routine from twisted.
- if isIPAddress(server_name):
- return False
-
- # next, check the deny list
- deny = acl_event.content.get("deny", [])
- if not isinstance(deny, (list, tuple)):
- logger.warning("Ignoring non-list deny ACL %s", deny)
- deny = []
- for e in deny:
- if _acl_entry_matches(server_name, e):
- # logger.info("%s matched deny rule %s", server_name, e)
- return False
-
- # then the allow list.
- allow = acl_event.content.get("allow", [])
- if not isinstance(allow, (list, tuple)):
- logger.warning("Ignoring non-list allow ACL %s", allow)
- allow = []
- for e in allow:
- if _acl_entry_matches(server_name, e):
- # logger.info("%s matched allow rule %s", server_name, e)
- return True
-
- # everything else should be rejected.
- # logger.info("%s fell through", server_name)
- return False
-
-
-def _acl_entry_matches(server_name: str, acl_entry: Any) -> bool:
- if not isinstance(acl_entry, str):
- logger.warning(
- "Ignoring non-str ACL entry '%s' (is %s)", acl_entry, type(acl_entry)
- )
- return False
- regex = glob_to_regex(acl_entry)
- return bool(regex.match(server_name))
+ if server_acl_evaluator and not server_acl_evaluator.server_matches_acl_event(
+ server_name
+ ):
+ raise AuthError(code=403, msg="Server is banned from room")
class FederationHandlerRegistry:
|