diff --git a/synapse/federation/__init__.py b/synapse/federation/__init__.py
index 0bfb79d09f..979fdf2431 100644
--- a/synapse/federation/__init__.py
+++ b/synapse/federation/__init__.py
@@ -17,15 +17,10 @@
"""
from .replication import ReplicationLayer
-from .transport import TransportLayer
+from .transport.client import TransportLayerClient
def initialize_http_replication(homeserver):
- transport = TransportLayer(
- homeserver,
- homeserver.hostname,
- server=homeserver.get_resource_for_federation(),
- client=homeserver.get_http_client()
- )
+ transport = TransportLayerClient(homeserver)
return ReplicationLayer(homeserver, transport)
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index c6259f9dc8..83c1f46586 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -57,7 +57,7 @@ class FederationClient(FederationBase):
cache_name="get_pdu_cache",
clock=self._clock,
max_len=1000,
- expiry_ms=120*1000,
+ expiry_ms=120 * 1000,
reset_expiry_on_get=False,
)
@@ -114,7 +114,7 @@ class FederationClient(FederationBase):
@log_function
def make_query(self, destination, query_type, args,
- retry_on_dns_fail=True):
+ retry_on_dns_fail=False):
"""Sends a federation Query to a remote homeserver of the given type
and arguments.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index a97aa0c94a..76820b924b 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -126,10 +126,8 @@ class FederationServer(FederationBase):
results = []
for pdu in pdu_list:
- d = self._handle_new_pdu(transaction.origin, pdu)
-
try:
- yield d
+ yield self._handle_new_pdu(transaction.origin, pdu)
results.append({})
except FederationError as e:
self.send_failure(e, transaction.origin)
@@ -139,8 +137,8 @@ class FederationServer(FederationBase):
logger.exception("Failed to handle PDU")
if hasattr(transaction, "edus"):
- for edu in [Edu(**x) for x in transaction.edus]:
- self.received_edu(
+ for edu in (Edu(**x) for x in transaction.edus):
+ yield self.received_edu(
transaction.origin,
edu.edu_type,
edu.content
@@ -163,11 +161,17 @@ class FederationServer(FederationBase):
)
defer.returnValue((200, response))
+ @defer.inlineCallbacks
def received_edu(self, origin, edu_type, content):
received_edus_counter.inc()
if edu_type in self.edu_handlers:
- self.edu_handlers[edu_type](origin, content)
+ try:
+ yield self.edu_handlers[edu_type](origin, content)
+ except SynapseError as e:
+ logger.info("Failed to handle edu %r: %r", edu_type, e)
+ except Exception as e:
+ logger.exception("Failed to handle edu %r", edu_type, e)
else:
logger.warn("Received EDU of type %s with no handler", edu_type)
@@ -545,8 +549,19 @@ class FederationServer(FederationBase):
return event
@defer.inlineCallbacks
- def exchange_third_party_invite(self, invite):
- ret = yield self.handler.exchange_third_party_invite(invite)
+ def exchange_third_party_invite(
+ self,
+ sender_user_id,
+ target_user_id,
+ room_id,
+ signed,
+ ):
+ ret = yield self.handler.exchange_third_party_invite(
+ sender_user_id,
+ target_user_id,
+ room_id,
+ signed,
+ )
defer.returnValue(ret)
@defer.inlineCallbacks
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 6e0be8ef15..3e062a5eab 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -54,8 +54,6 @@ class ReplicationLayer(FederationClient, FederationServer):
self.keyring = hs.get_keyring()
self.transport_layer = transport_layer
- self.transport_layer.register_received_handler(self)
- self.transport_layer.register_request_handler(self)
self.federation_client = self
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 622adad3ae..1928da03b3 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -103,7 +103,6 @@ class TransactionQueue(object):
else:
return not destination.startswith("localhost")
- @defer.inlineCallbacks
def enqueue_pdu(self, pdu, destinations, order):
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
@@ -141,8 +140,6 @@ class TransactionQueue(object):
deferreds.append(deferred)
- yield defer.DeferredList(deferreds, consumeErrors=True)
-
# NO inlineCallbacks
def enqueue_edu(self, edu):
destination = edu.destination
diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py
index 155a7d5870..d9fcc520a0 100644
--- a/synapse/federation/transport/__init__.py
+++ b/synapse/federation/transport/__init__.py
@@ -20,55 +20,3 @@ By default this is done over HTTPS (and all home servers are required to
support HTTPS), however individual pairings of servers may decide to
communicate over a different (albeit still reliable) protocol.
"""
-
-from .server import TransportLayerServer
-from .client import TransportLayerClient
-
-from synapse.util.ratelimitutils import FederationRateLimiter
-
-
-class TransportLayer(TransportLayerServer, TransportLayerClient):
- """This is a basic implementation of the transport layer that translates
- transactions and other requests to/from HTTP.
-
- Attributes:
- server_name (str): Local home server host
-
- server (synapse.http.server.HttpServer): the http server to
- register listeners on
-
- client (synapse.http.client.HttpClient): the http client used to
- send requests
-
- request_handler (TransportRequestHandler): The handler to fire when we
- receive requests for data.
-
- received_handler (TransportReceivedHandler): The handler to fire when
- we receive data.
- """
-
- def __init__(self, homeserver, server_name, server, client):
- """
- Args:
- server_name (str): Local home server host
- server (synapse.protocol.http.HttpServer): the http server to
- register listeners on
- client (synapse.protocol.http.HttpClient): the http client used to
- send requests
- """
- self.keyring = homeserver.get_keyring()
- self.clock = homeserver.get_clock()
- self.server_name = server_name
- self.server = server
- self.client = client
- self.request_handler = None
- self.received_handler = None
-
- self.ratelimiter = FederationRateLimiter(
- self.clock,
- window_size=homeserver.config.federation_rc_window_size,
- sleep_limit=homeserver.config.federation_rc_sleep_limit,
- sleep_msec=homeserver.config.federation_rc_sleep_delay,
- reject_limit=homeserver.config.federation_rc_reject_limit,
- concurrent_requests=homeserver.config.federation_rc_concurrent,
- )
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 949d01dea8..2237e3413c 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -28,6 +28,10 @@ logger = logging.getLogger(__name__)
class TransportLayerClient(object):
"""Sends federation HTTP requests to other servers"""
+ def __init__(self, hs):
+ self.server_name = hs.hostname
+ self.client = hs.get_http_client()
+
@log_function
def get_room_state(self, destination, room_id, event_id):
""" Requests all state for a given room from the given server at the
@@ -156,6 +160,7 @@ class TransportLayerClient(object):
path=path,
args=args,
retry_on_dns_fail=retry_on_dns_fail,
+ timeout=10000,
)
defer.returnValue(content)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 8dca0a7f6b..208bff8d4f 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -17,7 +17,9 @@ from twisted.internet import defer
from synapse.api.urls import FEDERATION_PREFIX as PREFIX
from synapse.api.errors import Codes, SynapseError
-from synapse.util.logutils import log_function
+from synapse.http.server import JsonResource
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.util.ratelimitutils import FederationRateLimiter
import functools
import logging
@@ -28,9 +30,41 @@ import re
logger = logging.getLogger(__name__)
-class TransportLayerServer(object):
+class TransportLayerServer(JsonResource):
"""Handles incoming federation HTTP requests"""
+ def __init__(self, hs):
+ self.hs = hs
+ self.clock = hs.get_clock()
+
+ super(TransportLayerServer, self).__init__(hs)
+
+ self.authenticator = Authenticator(hs)
+ self.ratelimiter = FederationRateLimiter(
+ self.clock,
+ window_size=hs.config.federation_rc_window_size,
+ sleep_limit=hs.config.federation_rc_sleep_limit,
+ sleep_msec=hs.config.federation_rc_sleep_delay,
+ reject_limit=hs.config.federation_rc_reject_limit,
+ concurrent_requests=hs.config.federation_rc_concurrent,
+ )
+
+ self.register_servlets()
+
+ def register_servlets(self):
+ register_servlets(
+ self.hs,
+ resource=self,
+ ratelimiter=self.ratelimiter,
+ authenticator=self.authenticator,
+ )
+
+
+class Authenticator(object):
+ def __init__(self, hs):
+ self.keyring = hs.get_keyring()
+ self.server_name = hs.hostname
+
# A method just so we can pass 'self' as the authenticator to the Servlets
@defer.inlineCallbacks
def authenticate_request(self, request):
@@ -98,37 +132,9 @@ class TransportLayerServer(object):
defer.returnValue((origin, content))
- @log_function
- def register_received_handler(self, handler):
- """ Register a handler that will be fired when we receive data.
-
- Args:
- handler (TransportReceivedHandler)
- """
- FederationSendServlet(
- handler,
- authenticator=self,
- ratelimiter=self.ratelimiter,
- server_name=self.server_name,
- ).register(self.server)
-
- @log_function
- def register_request_handler(self, handler):
- """ Register a handler that will be fired when we get asked for data.
-
- Args:
- handler (TransportRequestHandler)
- """
- for servletclass in SERVLET_CLASSES:
- servletclass(
- handler,
- authenticator=self,
- ratelimiter=self.ratelimiter,
- ).register(self.server)
-
class BaseFederationServlet(object):
- def __init__(self, handler, authenticator, ratelimiter):
+ def __init__(self, handler, authenticator, ratelimiter, server_name):
self.handler = handler
self.authenticator = authenticator
self.ratelimiter = ratelimiter
@@ -172,7 +178,9 @@ class FederationSendServlet(BaseFederationServlet):
PATH = "/send/([^/]*)/"
def __init__(self, handler, server_name, **kwargs):
- super(FederationSendServlet, self).__init__(handler, **kwargs)
+ super(FederationSendServlet, self).__init__(
+ handler, server_name=server_name, **kwargs
+ )
self.server_name = server_name
# This is when someone is trying to send us a bunch of data.
@@ -412,13 +420,22 @@ class On3pidBindServlet(BaseFederationServlet):
@defer.inlineCallbacks
def on_POST(self, request):
- content_bytes = request.content.read()
- content = json.loads(content_bytes)
+ content = parse_json_object_from_request(request)
if "invites" in content:
last_exception = None
for invite in content["invites"]:
try:
- yield self.handler.exchange_third_party_invite(invite)
+ if "signed" not in invite or "token" not in invite["signed"]:
+ message = ("Rejecting received notification of third-"
+ "party invite without signed: %s" % (invite,))
+ logger.info(message)
+ raise SynapseError(400, message)
+ yield self.handler.exchange_third_party_invite(
+ invite["sender"],
+ invite["mxid"],
+ invite["room_id"],
+ invite["signed"],
+ )
except Exception as e:
last_exception = e
if last_exception:
@@ -432,6 +449,7 @@ class On3pidBindServlet(BaseFederationServlet):
SERVLET_CLASSES = (
+ FederationSendServlet,
FederationPullServlet,
FederationEventServlet,
FederationStateServlet,
@@ -451,3 +469,13 @@ SERVLET_CLASSES = (
FederationThirdPartyInviteExchangeServlet,
On3pidBindServlet,
)
+
+
+def register_servlets(hs, resource, authenticator, ratelimiter):
+ for servletclass in SERVLET_CLASSES:
+ servletclass(
+ handler=hs.get_replication_layer(),
+ authenticator=authenticator,
+ ratelimiter=ratelimiter,
+ server_name=hs.hostname,
+ ).register(resource)
|