summary refs log tree commit diff
path: root/synapse/federation/federation_server.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/federation_server.py')
-rw-r--r--synapse/federation/federation_server.py278
1 files changed, 241 insertions, 37 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 2d420a58a2..3e0cd294a1 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -14,28 +14,40 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+import re
 
-import simplejson as json
-from twisted.internet import defer
+import six
+from six import iteritems
 
-from synapse.api.errors import AuthError, FederationError, SynapseError, NotFoundError
-from synapse.crypto.event_signing import compute_event_signature
-from synapse.federation.federation_base import (
-    FederationBase,
-    event_from_pdu_json,
-)
+from canonicaljson import json
+from prometheus_client import Counter
 
+from twisted.internet import defer
+from twisted.internet.abstract import isIPAddress
+from twisted.python import failure
+
+from synapse.api.constants import EventTypes
+from synapse.api.errors import (
+    AuthError,
+    FederationError,
+    IncompatibleRoomVersionError,
+    NotFoundError,
+    SynapseError,
+)
+from synapse.crypto.event_signing import compute_event_signature
+from synapse.federation.federation_base import FederationBase, event_from_pdu_json
 from synapse.federation.persistence import TransactionActions
 from synapse.federation.units import Edu, Transaction
+from synapse.http.endpoint import parse_server_name
+from synapse.replication.http.federation import (
+    ReplicationFederationSendEduRestServlet,
+    ReplicationGetQueryRestServlet,
+)
 from synapse.types import get_domain_from_id
-from synapse.util import async
+from synapse.util.async_helpers import Linearizer, concurrently_execute
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.logutils import log_function
 
-from prometheus_client import Counter
-
-from six import iteritems
-
 # when processing incoming transactions, we try to handle multiple rooms in
 # parallel, up to this limit.
 TRANSACTION_CONCURRENCY_LIMIT = 10
@@ -59,8 +71,8 @@ class FederationServer(FederationBase):
         self.auth = hs.get_auth()
         self.handler = hs.get_handlers().federation_handler
 
-        self._server_linearizer = async.Linearizer("fed_server")
-        self._transaction_linearizer = async.Linearizer("fed_txn_handler")
+        self._server_linearizer = Linearizer("fed_server")
+        self._transaction_linearizer = Linearizer("fed_txn_handler")
 
         self.transaction_actions = TransactionActions(self.store)
 
@@ -74,6 +86,9 @@ class FederationServer(FederationBase):
     @log_function
     def on_backfill_request(self, origin, room_id, versions, limit):
         with (yield self._server_linearizer.queue((origin, room_id))):
+            origin_host, _ = parse_server_name(origin)
+            yield self.check_server_matches_acl(origin_host, room_id)
+
             pdus = yield self.handler.on_backfill_request(
                 origin, room_id, versions, limit
             )
@@ -134,6 +149,8 @@ class FederationServer(FederationBase):
 
         received_pdus_counter.inc(len(transaction.pdus))
 
+        origin_host, _ = parse_server_name(transaction.origin)
+
         pdus_by_room = {}
 
         for p in transaction.pdus:
@@ -154,9 +171,21 @@ class FederationServer(FederationBase):
         # we can process different rooms in parallel (which is useful if they
         # require callouts to other servers to fetch missing events), but
         # impose a limit to avoid going too crazy with ram/cpu.
+
         @defer.inlineCallbacks
         def process_pdus_for_room(room_id):
             logger.debug("Processing PDUs for %s", room_id)
+            try:
+                yield self.check_server_matches_acl(origin_host, room_id)
+            except AuthError as e:
+                logger.warn(
+                    "Ignoring PDUs for room %s from banned server", room_id,
+                )
+                for pdu in pdus_by_room[room_id]:
+                    event_id = pdu.event_id
+                    pdu_results[event_id] = e.error_dict()
+                return
+
             for pdu in pdus_by_room[room_id]:
                 event_id = pdu.event_id
                 try:
@@ -168,10 +197,14 @@ class FederationServer(FederationBase):
                     logger.warn("Error handling PDU %s: %s", event_id, e)
                     pdu_results[event_id] = {"error": str(e)}
                 except Exception as e:
+                    f = failure.Failure()
                     pdu_results[event_id] = {"error": str(e)}
-                    logger.exception("Failed to handle PDU %s", event_id)
+                    logger.error(
+                        "Failed to handle PDU %s: %s",
+                        event_id, f.getTraceback().rstrip(),
+                    )
 
-        yield async.concurrently_execute(
+        yield concurrently_execute(
             process_pdus_for_room, pdus_by_room.keys(),
             TRANSACTION_CONCURRENCY_LIMIT,
         )
@@ -184,10 +217,6 @@ class FederationServer(FederationBase):
                     edu.content
                 )
 
-        pdu_failures = getattr(transaction, "pdu_failures", [])
-        for failure in pdu_failures:
-            logger.info("Got failure %r", failure)
-
         response = {
             "pdus": pdu_results,
         }
@@ -211,6 +240,9 @@ class FederationServer(FederationBase):
         if not event_id:
             raise NotImplementedError("Specify an event")
 
+        origin_host, _ = parse_server_name(origin)
+        yield self.check_server_matches_acl(origin_host, room_id)
+
         in_room = yield self.auth.check_host_in_room(room_id, origin)
         if not in_room:
             raise AuthError(403, "Host not in room.")
@@ -234,6 +266,9 @@ class FederationServer(FederationBase):
         if not event_id:
             raise NotImplementedError("Specify an event")
 
+        origin_host, _ = parse_server_name(origin)
+        yield self.check_server_matches_acl(origin_host, room_id)
+
         in_room = yield self.auth.check_host_in_room(room_id, origin)
         if not in_room:
             raise AuthError(403, "Host not in room.")
@@ -277,7 +312,7 @@ class FederationServer(FederationBase):
     @defer.inlineCallbacks
     @log_function
     def on_pdu_request(self, origin, event_id):
-        pdu = yield self._get_persisted_pdu(origin, event_id)
+        pdu = yield self.handler.get_persisted_pdu(origin, event_id)
 
         if pdu:
             defer.returnValue(
@@ -298,14 +333,27 @@ class FederationServer(FederationBase):
         defer.returnValue((200, resp))
 
     @defer.inlineCallbacks
-    def on_make_join_request(self, room_id, user_id):
+    def on_make_join_request(self, origin, room_id, user_id, supported_versions):
+        origin_host, _ = parse_server_name(origin)
+        yield self.check_server_matches_acl(origin_host, room_id)
+
+        room_version = yield self.store.get_room_version(room_id)
+        if room_version not in supported_versions:
+            logger.warn("Room version %s not in %s", room_version, supported_versions)
+            raise IncompatibleRoomVersionError(room_version=room_version)
+
         pdu = yield self.handler.on_make_join_request(room_id, user_id)
         time_now = self._clock.time_msec()
-        defer.returnValue({"event": pdu.get_pdu_json(time_now)})
+        defer.returnValue({
+            "event": pdu.get_pdu_json(time_now),
+            "room_version": room_version,
+        })
 
     @defer.inlineCallbacks
     def on_invite_request(self, origin, content):
         pdu = event_from_pdu_json(content)
+        origin_host, _ = parse_server_name(origin)
+        yield self.check_server_matches_acl(origin_host, pdu.room_id)
         ret_pdu = yield self.handler.on_invite_request(origin, pdu)
         time_now = self._clock.time_msec()
         defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)}))
@@ -314,6 +362,10 @@ class FederationServer(FederationBase):
     def on_send_join_request(self, origin, content):
         logger.debug("on_send_join_request: content: %s", content)
         pdu = event_from_pdu_json(content)
+
+        origin_host, _ = parse_server_name(origin)
+        yield self.check_server_matches_acl(origin_host, pdu.room_id)
+
         logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures)
         res_pdus = yield self.handler.on_send_join_request(origin, pdu)
         time_now = self._clock.time_msec()
@@ -325,7 +377,9 @@ class FederationServer(FederationBase):
         }))
 
     @defer.inlineCallbacks
-    def on_make_leave_request(self, room_id, user_id):
+    def on_make_leave_request(self, origin, room_id, user_id):
+        origin_host, _ = parse_server_name(origin)
+        yield self.check_server_matches_acl(origin_host, room_id)
         pdu = yield self.handler.on_make_leave_request(room_id, user_id)
         time_now = self._clock.time_msec()
         defer.returnValue({"event": pdu.get_pdu_json(time_now)})
@@ -334,6 +388,10 @@ class FederationServer(FederationBase):
     def on_send_leave_request(self, origin, content):
         logger.debug("on_send_leave_request: content: %s", content)
         pdu = event_from_pdu_json(content)
+
+        origin_host, _ = parse_server_name(origin)
+        yield self.check_server_matches_acl(origin_host, pdu.room_id)
+
         logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures)
         yield self.handler.on_send_leave_request(origin, pdu)
         defer.returnValue((200, {}))
@@ -341,6 +399,9 @@ class FederationServer(FederationBase):
     @defer.inlineCallbacks
     def on_event_auth(self, origin, room_id, event_id):
         with (yield self._server_linearizer.queue((origin, room_id))):
+            origin_host, _ = parse_server_name(origin)
+            yield self.check_server_matches_acl(origin_host, room_id)
+
             time_now = self._clock.time_msec()
             auth_pdus = yield self.handler.on_event_auth(event_id)
             res = {
@@ -369,6 +430,9 @@ class FederationServer(FederationBase):
             Deferred: Results in `dict` with the same format as `content`
         """
         with (yield self._server_linearizer.queue((origin, room_id))):
+            origin_host, _ = parse_server_name(origin)
+            yield self.check_server_matches_acl(origin_host, room_id)
+
             auth_chain = [
                 event_from_pdu_json(e)
                 for e in content["auth_chain"]
@@ -381,6 +445,7 @@ class FederationServer(FederationBase):
             ret = yield self.handler.on_query_auth(
                 origin,
                 event_id,
+                room_id,
                 signed_auth,
                 content.get("rejects", []),
                 content.get("missing", []),
@@ -442,6 +507,9 @@ class FederationServer(FederationBase):
     def on_get_missing_events(self, origin, room_id, earliest_events,
                               latest_events, limit, min_depth):
         with (yield self._server_linearizer.queue((origin, room_id))):
+            origin_host, _ = parse_server_name(origin)
+            yield self.check_server_matches_acl(origin_host, room_id)
+
             logger.info(
                 "on_get_missing_events: earliest_events: %r, latest_events: %r,"
                 " limit: %d, min_depth: %d",
@@ -470,17 +538,6 @@ class FederationServer(FederationBase):
         ts_now_ms = self._clock.time_msec()
         return self.store.get_user_id_for_open_id_token(token, ts_now_ms)
 
-    @log_function
-    def _get_persisted_pdu(self, origin, event_id, do_auth=True):
-        """ Get a PDU from the database with given origin and id.
-
-        Returns:
-            Deferred: Results in a `Pdu`.
-        """
-        return self.handler.get_persisted_pdu(
-            origin, event_id, do_auth=do_auth
-        )
-
     def _transaction_from_pdus(self, pdu_list):
         """Returns a new Transaction containing the given PDUs suitable for
         transmission.
@@ -560,7 +617,9 @@ class FederationServer(FederationBase):
                 affected=pdu.event_id,
             )
 
-        yield self.handler.on_receive_pdu(origin, pdu, get_missing=True)
+        yield self.handler.on_receive_pdu(
+            origin, pdu, get_missing=True, sent_to_us_directly=True,
+        )
 
     def __str__(self):
         return "<ReplicationLayer(%s)>" % self.server_name
@@ -588,6 +647,101 @@ class FederationServer(FederationBase):
         )
         defer.returnValue(ret)
 
+    @defer.inlineCallbacks
+    def check_server_matches_acl(self, server_name, room_id):
+        """Check if the given server is allowed by the server ACLs in the room
+
+        Args:
+            server_name (str): name of server, *without any port part*
+            room_id (str): ID of the room to check
+
+        Raises:
+            AuthError if the server does not match the ACL
+        """
+        state_ids = yield self.store.get_current_state_ids(room_id)
+        acl_event_id = state_ids.get((EventTypes.ServerACL, ""))
+
+        if not acl_event_id:
+            return
+
+        acl_event = yield self.store.get_event(acl_event_id)
+        if server_matches_acl_event(server_name, acl_event):
+            return
+
+        raise AuthError(code=403, msg="Server is banned from room")
+
+
+def server_matches_acl_event(server_name, acl_event):
+    """Check if the given server is allowed by the ACL event
+
+    Args:
+        server_name (str): name of server, without any port part
+        acl_event (EventBase): m.room.server_acl event
+
+    Returns:
+        bool: True if this server is allowed by the ACLs
+    """
+    logger.debug("Checking %s against acl %s", server_name, acl_event.content)
+
+    # first of all, check if literal IPs are blocked, and if so, whether the
+    # server name is a literal IP
+    allow_ip_literals = acl_event.content.get("allow_ip_literals", True)
+    if not isinstance(allow_ip_literals, bool):
+        logger.warn("Ignorning non-bool allow_ip_literals flag")
+        allow_ip_literals = True
+    if not allow_ip_literals:
+        # check for ipv6 literals. These start with '['.
+        if server_name[0] == '[':
+            return False
+
+        # check for ipv4 literals. We can just lift the routine from twisted.
+        if isIPAddress(server_name):
+            return False
+
+    # next,  check the deny list
+    deny = acl_event.content.get("deny", [])
+    if not isinstance(deny, (list, tuple)):
+        logger.warn("Ignorning non-list deny ACL %s", deny)
+        deny = []
+    for e in deny:
+        if _acl_entry_matches(server_name, e):
+            # logger.info("%s matched deny rule %s", server_name, e)
+            return False
+
+    # then the allow list.
+    allow = acl_event.content.get("allow", [])
+    if not isinstance(allow, (list, tuple)):
+        logger.warn("Ignorning non-list allow ACL %s", allow)
+        allow = []
+    for e in allow:
+        if _acl_entry_matches(server_name, e):
+            # logger.info("%s matched allow rule %s", server_name, e)
+            return True
+
+    # everything else should be rejected.
+    # logger.info("%s fell through", server_name)
+    return False
+
+
+def _acl_entry_matches(server_name, acl_entry):
+    if not isinstance(acl_entry, six.string_types):
+        logger.warn("Ignoring non-str ACL entry '%s' (is %s)", acl_entry, type(acl_entry))
+        return False
+    regex = _glob_to_regex(acl_entry)
+    return regex.match(server_name)
+
+
+def _glob_to_regex(glob):
+    res = ''
+    for c in glob:
+        if c == '*':
+            res = res + '.*'
+        elif c == '?':
+            res = res + '.'
+        else:
+            res = res + re.escape(c)
+    return re.compile(res + "\\Z", re.IGNORECASE)
+
 
 class FederationHandlerRegistry(object):
     """Allows classes to register themselves as handlers for a given EDU or
@@ -610,6 +764,8 @@ class FederationHandlerRegistry(object):
         if edu_type in self.edu_handlers:
             raise KeyError("Already have an EDU handler for %s" % (edu_type,))
 
+        logger.info("Registering federation EDU handler for %r", edu_type)
+
         self.edu_handlers[edu_type] = handler
 
     def register_query_handler(self, query_type, handler):
@@ -628,6 +784,8 @@ class FederationHandlerRegistry(object):
                 "Already have a Query handler for %s" % (query_type,)
             )
 
+        logger.info("Registering federation query handler for %r", query_type)
+
         self.query_handlers[query_type] = handler
 
     @defer.inlineCallbacks
@@ -650,3 +808,49 @@ class FederationHandlerRegistry(object):
             raise NotFoundError("No handler for Query type '%s'" % (query_type,))
 
         return handler(args)
+
+
+class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
+    """A FederationHandlerRegistry for worker processes.
+
+    When receiving EDU or queries it will check if an appropriate handler has
+    been registered on the worker, if there isn't one then it calls off to the
+    master process.
+    """
+
+    def __init__(self, hs):
+        self.config = hs.config
+        self.http_client = hs.get_simple_http_client()
+        self.clock = hs.get_clock()
+
+        self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
+        self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
+
+        super(ReplicationFederationHandlerRegistry, self).__init__()
+
+    def on_edu(self, edu_type, origin, content):
+        """Overrides FederationHandlerRegistry
+        """
+        handler = self.edu_handlers.get(edu_type)
+        if handler:
+            return super(ReplicationFederationHandlerRegistry, self).on_edu(
+                edu_type, origin, content,
+            )
+
+        return self._send_edu(
+                edu_type=edu_type,
+                origin=origin,
+                content=content,
+        )
+
+    def on_query(self, query_type, args):
+        """Overrides FederationHandlerRegistry
+        """
+        handler = self.query_handlers.get(query_type)
+        if handler:
+            return handler(args)
+
+        return self._get_query_client(
+                query_type=query_type,
+                args=args,
+        )