summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/__init__.py9
-rw-r--r--synapse/federation/federation_client.py4
-rw-r--r--synapse/federation/federation_server.py31
-rw-r--r--synapse/federation/replication.py2
-rw-r--r--synapse/federation/transaction_queue.py3
-rw-r--r--synapse/federation/transport/__init__.py52
-rw-r--r--synapse/federation/transport/client.py5
-rw-r--r--synapse/federation/transport/server.py98
8 files changed, 95 insertions, 109 deletions
diff --git a/synapse/federation/__init__.py b/synapse/federation/__init__.py
index 0bfb79d09f..979fdf2431 100644
--- a/synapse/federation/__init__.py
+++ b/synapse/federation/__init__.py
@@ -17,15 +17,10 @@
 """
 
 from .replication import ReplicationLayer
-from .transport import TransportLayer
+from .transport.client import TransportLayerClient
 
 
 def initialize_http_replication(homeserver):
-    transport = TransportLayer(
-        homeserver,
-        homeserver.hostname,
-        server=homeserver.get_resource_for_federation(),
-        client=homeserver.get_http_client()
-    )
+    transport = TransportLayerClient(homeserver)
 
     return ReplicationLayer(homeserver, transport)
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index c6259f9dc8..83c1f46586 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -57,7 +57,7 @@ class FederationClient(FederationBase):
             cache_name="get_pdu_cache",
             clock=self._clock,
             max_len=1000,
-            expiry_ms=120*1000,
+            expiry_ms=120 * 1000,
             reset_expiry_on_get=False,
         )
 
@@ -114,7 +114,7 @@ class FederationClient(FederationBase):
 
     @log_function
     def make_query(self, destination, query_type, args,
-                   retry_on_dns_fail=True):
+                   retry_on_dns_fail=False):
         """Sends a federation Query to a remote homeserver of the given type
         and arguments.
 
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index a97aa0c94a..76820b924b 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -126,10 +126,8 @@ class FederationServer(FederationBase):
         results = []
 
         for pdu in pdu_list:
-            d = self._handle_new_pdu(transaction.origin, pdu)
-
             try:
-                yield d
+                yield self._handle_new_pdu(transaction.origin, pdu)
                 results.append({})
             except FederationError as e:
                 self.send_failure(e, transaction.origin)
@@ -139,8 +137,8 @@ class FederationServer(FederationBase):
                 logger.exception("Failed to handle PDU")
 
         if hasattr(transaction, "edus"):
-            for edu in [Edu(**x) for x in transaction.edus]:
-                self.received_edu(
+            for edu in (Edu(**x) for x in transaction.edus):
+                yield self.received_edu(
                     transaction.origin,
                     edu.edu_type,
                     edu.content
@@ -163,11 +161,17 @@ class FederationServer(FederationBase):
         )
         defer.returnValue((200, response))
 
+    @defer.inlineCallbacks
     def received_edu(self, origin, edu_type, content):
         received_edus_counter.inc()
 
         if edu_type in self.edu_handlers:
-            self.edu_handlers[edu_type](origin, content)
+            try:
+                yield self.edu_handlers[edu_type](origin, content)
+            except SynapseError as e:
+                logger.info("Failed to handle edu %r: %r", edu_type, e)
+            except Exception as e:
+                logger.exception("Failed to handle edu %r", edu_type, e)
         else:
             logger.warn("Received EDU of type %s with no handler", edu_type)
 
@@ -545,8 +549,19 @@ class FederationServer(FederationBase):
         return event
 
     @defer.inlineCallbacks
-    def exchange_third_party_invite(self, invite):
-        ret = yield self.handler.exchange_third_party_invite(invite)
+    def exchange_third_party_invite(
+            self,
+            sender_user_id,
+            target_user_id,
+            room_id,
+            signed,
+    ):
+        ret = yield self.handler.exchange_third_party_invite(
+            sender_user_id,
+            target_user_id,
+            room_id,
+            signed,
+        )
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 6e0be8ef15..3e062a5eab 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -54,8 +54,6 @@ class ReplicationLayer(FederationClient, FederationServer):
         self.keyring = hs.get_keyring()
 
         self.transport_layer = transport_layer
-        self.transport_layer.register_received_handler(self)
-        self.transport_layer.register_request_handler(self)
 
         self.federation_client = self
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 622adad3ae..1928da03b3 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -103,7 +103,6 @@ class TransactionQueue(object):
         else:
             return not destination.startswith("localhost")
 
-    @defer.inlineCallbacks
     def enqueue_pdu(self, pdu, destinations, order):
         # We loop through all destinations to see whether we already have
         # a transaction in progress. If we do, stick it in the pending_pdus
@@ -141,8 +140,6 @@ class TransactionQueue(object):
 
             deferreds.append(deferred)
 
-        yield defer.DeferredList(deferreds, consumeErrors=True)
-
     # NO inlineCallbacks
     def enqueue_edu(self, edu):
         destination = edu.destination
diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py
index 155a7d5870..d9fcc520a0 100644
--- a/synapse/federation/transport/__init__.py
+++ b/synapse/federation/transport/__init__.py
@@ -20,55 +20,3 @@ By default this is done over HTTPS (and all home servers are required to
 support HTTPS), however individual pairings of servers may decide to
 communicate over a different (albeit still reliable) protocol.
 """
-
-from .server import TransportLayerServer
-from .client import TransportLayerClient
-
-from synapse.util.ratelimitutils import FederationRateLimiter
-
-
-class TransportLayer(TransportLayerServer, TransportLayerClient):
-    """This is a basic implementation of the transport layer that translates
-    transactions and other requests to/from HTTP.
-
-    Attributes:
-        server_name (str): Local home server host
-
-        server (synapse.http.server.HttpServer): the http server to
-                register listeners on
-
-        client (synapse.http.client.HttpClient): the http client used to
-                send requests
-
-        request_handler (TransportRequestHandler): The handler to fire when we
-            receive requests for data.
-
-        received_handler (TransportReceivedHandler): The handler to fire when
-            we receive data.
-    """
-
-    def __init__(self, homeserver, server_name, server, client):
-        """
-        Args:
-            server_name (str): Local home server host
-            server (synapse.protocol.http.HttpServer): the http server to
-                register listeners on
-            client (synapse.protocol.http.HttpClient): the http client used to
-                send requests
-        """
-        self.keyring = homeserver.get_keyring()
-        self.clock = homeserver.get_clock()
-        self.server_name = server_name
-        self.server = server
-        self.client = client
-        self.request_handler = None
-        self.received_handler = None
-
-        self.ratelimiter = FederationRateLimiter(
-            self.clock,
-            window_size=homeserver.config.federation_rc_window_size,
-            sleep_limit=homeserver.config.federation_rc_sleep_limit,
-            sleep_msec=homeserver.config.federation_rc_sleep_delay,
-            reject_limit=homeserver.config.federation_rc_reject_limit,
-            concurrent_requests=homeserver.config.federation_rc_concurrent,
-        )
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 949d01dea8..2237e3413c 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -28,6 +28,10 @@ logger = logging.getLogger(__name__)
 class TransportLayerClient(object):
     """Sends federation HTTP requests to other servers"""
 
+    def __init__(self, hs):
+        self.server_name = hs.hostname
+        self.client = hs.get_http_client()
+
     @log_function
     def get_room_state(self, destination, room_id, event_id):
         """ Requests all state for a given room from the given server at the
@@ -156,6 +160,7 @@ class TransportLayerClient(object):
             path=path,
             args=args,
             retry_on_dns_fail=retry_on_dns_fail,
+            timeout=10000,
         )
 
         defer.returnValue(content)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 8dca0a7f6b..208bff8d4f 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -17,7 +17,9 @@ from twisted.internet import defer
 
 from synapse.api.urls import FEDERATION_PREFIX as PREFIX
 from synapse.api.errors import Codes, SynapseError
-from synapse.util.logutils import log_function
+from synapse.http.server import JsonResource
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.util.ratelimitutils import FederationRateLimiter
 
 import functools
 import logging
@@ -28,9 +30,41 @@ import re
 logger = logging.getLogger(__name__)
 
 
-class TransportLayerServer(object):
+class TransportLayerServer(JsonResource):
     """Handles incoming federation HTTP requests"""
 
+    def __init__(self, hs):
+        self.hs = hs
+        self.clock = hs.get_clock()
+
+        super(TransportLayerServer, self).__init__(hs)
+
+        self.authenticator = Authenticator(hs)
+        self.ratelimiter = FederationRateLimiter(
+            self.clock,
+            window_size=hs.config.federation_rc_window_size,
+            sleep_limit=hs.config.federation_rc_sleep_limit,
+            sleep_msec=hs.config.federation_rc_sleep_delay,
+            reject_limit=hs.config.federation_rc_reject_limit,
+            concurrent_requests=hs.config.federation_rc_concurrent,
+        )
+
+        self.register_servlets()
+
+    def register_servlets(self):
+        register_servlets(
+            self.hs,
+            resource=self,
+            ratelimiter=self.ratelimiter,
+            authenticator=self.authenticator,
+        )
+
+
+class Authenticator(object):
+    def __init__(self, hs):
+        self.keyring = hs.get_keyring()
+        self.server_name = hs.hostname
+
     # A method just so we can pass 'self' as the authenticator to the Servlets
     @defer.inlineCallbacks
     def authenticate_request(self, request):
@@ -98,37 +132,9 @@ class TransportLayerServer(object):
 
         defer.returnValue((origin, content))
 
-    @log_function
-    def register_received_handler(self, handler):
-        """ Register a handler that will be fired when we receive data.
-
-        Args:
-            handler (TransportReceivedHandler)
-        """
-        FederationSendServlet(
-            handler,
-            authenticator=self,
-            ratelimiter=self.ratelimiter,
-            server_name=self.server_name,
-        ).register(self.server)
-
-    @log_function
-    def register_request_handler(self, handler):
-        """ Register a handler that will be fired when we get asked for data.
-
-        Args:
-            handler (TransportRequestHandler)
-        """
-        for servletclass in SERVLET_CLASSES:
-            servletclass(
-                handler,
-                authenticator=self,
-                ratelimiter=self.ratelimiter,
-            ).register(self.server)
-
 
 class BaseFederationServlet(object):
-    def __init__(self, handler, authenticator, ratelimiter):
+    def __init__(self, handler, authenticator, ratelimiter, server_name):
         self.handler = handler
         self.authenticator = authenticator
         self.ratelimiter = ratelimiter
@@ -172,7 +178,9 @@ class FederationSendServlet(BaseFederationServlet):
     PATH = "/send/([^/]*)/"
 
     def __init__(self, handler, server_name, **kwargs):
-        super(FederationSendServlet, self).__init__(handler, **kwargs)
+        super(FederationSendServlet, self).__init__(
+            handler, server_name=server_name, **kwargs
+        )
         self.server_name = server_name
 
     # This is when someone is trying to send us a bunch of data.
@@ -412,13 +420,22 @@ class On3pidBindServlet(BaseFederationServlet):
 
     @defer.inlineCallbacks
     def on_POST(self, request):
-        content_bytes = request.content.read()
-        content = json.loads(content_bytes)
+        content = parse_json_object_from_request(request)
         if "invites" in content:
             last_exception = None
             for invite in content["invites"]:
                 try:
-                    yield self.handler.exchange_third_party_invite(invite)
+                    if "signed" not in invite or "token" not in invite["signed"]:
+                        message = ("Rejecting received notification of third-"
+                                   "party invite without signed: %s" % (invite,))
+                        logger.info(message)
+                        raise SynapseError(400, message)
+                    yield self.handler.exchange_third_party_invite(
+                        invite["sender"],
+                        invite["mxid"],
+                        invite["room_id"],
+                        invite["signed"],
+                    )
                 except Exception as e:
                     last_exception = e
             if last_exception:
@@ -432,6 +449,7 @@ class On3pidBindServlet(BaseFederationServlet):
 
 
 SERVLET_CLASSES = (
+    FederationSendServlet,
     FederationPullServlet,
     FederationEventServlet,
     FederationStateServlet,
@@ -451,3 +469,13 @@ SERVLET_CLASSES = (
     FederationThirdPartyInviteExchangeServlet,
     On3pidBindServlet,
 )
+
+
+def register_servlets(hs, resource, authenticator, ratelimiter):
+    for servletclass in SERVLET_CLASSES:
+        servletclass(
+            handler=hs.get_replication_layer(),
+            authenticator=authenticator,
+            ratelimiter=ratelimiter,
+            server_name=hs.hostname,
+        ).register(resource)