summary refs log tree commit diff
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2015-01-16 13:21:14 +0000
committerMark Haines <mark.haines@matrix.org>2015-01-16 19:01:03 +0000
commit602684eac5b7acf61e10d7fabed4977635d3fb46 (patch)
tree31e3ced4a46f1d38d1831f39c5ab677c17d4dd79
parentRemove temporary debug logging that was accidentally committed (diff)
downloadsynapse-602684eac5b7acf61e10d7fabed4977635d3fb46.tar.xz
Split transport layer into client and server parts
-rw-r--r--synapse/federation/transport/__init__.py62
-rw-r--r--synapse/federation/transport/client.py257
-rw-r--r--synapse/federation/transport/server.py (renamed from synapse/federation/transport.py)274
3 files changed, 321 insertions, 272 deletions
diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py
new file mode 100644
index 0000000000..6800ac46c5
--- /dev/null
+++ b/synapse/federation/transport/__init__.py
@@ -0,0 +1,62 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""The transport layer is responsible for both sending transactions to remote
+home servers and receiving a variety of requests from other home servers.
+
+By default this is done over HTTPS (and all home servers are required to
+support HTTPS), however individual pairings of servers may decide to
+communicate over a different (albeit still reliable) protocol.
+"""
+
+from .server import TransportLayerServer
+from .client import TransportLayerClient
+
+
+class TransportLayer(TransportLayerServer, TransportLayerClient):
+    """This is a basic implementation of the transport layer that translates
+    transactions and other requests to/from HTTP.
+
+    Attributes:
+        server_name (str): Local home server host
+
+        server (synapse.http.server.HttpServer): the http server to
+                register listeners on
+
+        client (synapse.http.client.HttpClient): the http client used to
+                send requests
+
+        request_handler (TransportRequestHandler): The handler to fire when we
+            receive requests for data.
+
+        received_handler (TransportReceivedHandler): The handler to fire when
+            we receive data.
+    """
+
+    def __init__(self, homeserver, server_name, server, client):
+        """
+        Args:
+            server_name (str): Local home server host
+            server (synapse.protocol.http.HttpServer): the http server to
+                register listeners on
+            client (synapse.protocol.http.HttpClient): the http client used to
+                send requests
+        """
+        self.keyring = homeserver.get_keyring()
+        self.server_name = server_name
+        self.server = server
+        self.client = client
+        self.request_handler = None
+        self.received_handler = None
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
new file mode 100644
index 0000000000..604ade683b
--- /dev/null
+++ b/synapse/federation/transport/client.py
@@ -0,0 +1,257 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.api.urls import FEDERATION_PREFIX as PREFIX
+from synapse.util.logutils import log_function
+
+import logging
+import json
+
+
+logger = logging.getLogger(__name__)
+
+
+class TransportLayerClient(object):
+    """Sends federation HTTP requests to other servers"""
+
+    @log_function
+    def get_context_state(self, destination, context, event_id=None):
+        """ Requests all state for a given context (i.e. room) from the
+        given server.
+
+        Args:
+            destination (str): The host name of the remote home server we want
+                to get the state from.
+            context (str): The name of the context we want the state of
+
+        Returns:
+            Deferred: Results in a dict received from the remote homeserver.
+        """
+        logger.debug("get_context_state dest=%s, context=%s",
+                     destination, context)
+
+        subpath = "/state/%s/" % context
+
+        args = {}
+        if event_id:
+            args["event_id"] = event_id
+
+        return self._do_request_for_transaction(
+            destination, subpath, args=args
+        )
+
+    @log_function
+    def get_event(self, destination, event_id):
+        """ Requests the pdu with give id and origin from the given server.
+
+        Args:
+            destination (str): The host name of the remote home server we want
+                to get the state from.
+            event_id (str): The id of the event being requested.
+
+        Returns:
+            Deferred: Results in a dict received from the remote homeserver.
+        """
+        logger.debug("get_pdu dest=%s, event_id=%s",
+                     destination, event_id)
+
+        subpath = "/event/%s/" % (event_id, )
+
+        return self._do_request_for_transaction(destination, subpath)
+
+    @log_function
+    def backfill(self, dest, context, event_tuples, limit):
+        """ Requests `limit` previous PDUs in a given context before list of
+        PDUs.
+
+        Args:
+            dest (str)
+            context (str)
+            event_tuples (list)
+            limt (int)
+
+        Returns:
+            Deferred: Results in a dict received from the remote homeserver.
+        """
+        logger.debug(
+            "backfill dest=%s, context=%s, event_tuples=%s, limit=%s",
+            dest, context, repr(event_tuples), str(limit)
+        )
+
+        if not event_tuples:
+            # TODO: raise?
+            return
+
+        subpath = "/backfill/%s/" % (context,)
+
+        args = {
+            "v": event_tuples,
+            "limit": [str(limit)],
+        }
+
+        return self._do_request_for_transaction(
+            dest,
+            subpath,
+            args=args,
+        )
+
+    @defer.inlineCallbacks
+    @log_function
+    def send_transaction(self, transaction, json_data_callback=None):
+        """ Sends the given Transaction to its destination
+
+        Args:
+            transaction (Transaction)
+
+        Returns:
+            Deferred: Results of the deferred is a tuple in the form of
+            (response_code, response_body) where the response_body is a
+            python dict decoded from json
+        """
+        logger.debug(
+            "send_data dest=%s, txid=%s",
+            transaction.destination, transaction.transaction_id
+        )
+
+        if transaction.destination == self.server_name:
+            raise RuntimeError("Transport layer cannot send to itself!")
+
+        # FIXME: This is only used by the tests. The actual json sent is
+        # generated by the json_data_callback.
+        json_data = transaction.get_dict()
+
+        code, response = yield self.client.put_json(
+            transaction.destination,
+            path=PREFIX + "/send/%s/" % transaction.transaction_id,
+            data=json_data,
+            json_data_callback=json_data_callback,
+        )
+
+        logger.debug(
+            "send_data dest=%s, txid=%s, got response: %d",
+            transaction.destination, transaction.transaction_id, code
+        )
+
+        defer.returnValue((code, response))
+
+    @defer.inlineCallbacks
+    @log_function
+    def make_query(self, destination, query_type, args, retry_on_dns_fail):
+        path = PREFIX + "/query/%s" % query_type
+
+        response = yield self.client.get_json(
+            destination=destination,
+            path=path,
+            args=args,
+            retry_on_dns_fail=retry_on_dns_fail,
+        )
+
+        defer.returnValue(response)
+
+    @defer.inlineCallbacks
+    @log_function
+    def make_join(self, destination, context, user_id, retry_on_dns_fail=True):
+        path = PREFIX + "/make_join/%s/%s" % (context, user_id,)
+
+        response = yield self.client.get_json(
+            destination=destination,
+            path=path,
+            retry_on_dns_fail=retry_on_dns_fail,
+        )
+
+        defer.returnValue(response)
+
+    @defer.inlineCallbacks
+    @log_function
+    def send_join(self, destination, context, event_id, content):
+        path = PREFIX + "/send_join/%s/%s" % (
+            context,
+            event_id,
+        )
+
+        code, content = yield self.client.put_json(
+            destination=destination,
+            path=path,
+            data=content,
+        )
+
+        if not 200 <= code < 300:
+            raise RuntimeError("Got %d from send_join", code)
+
+        defer.returnValue(json.loads(content))
+
+    @defer.inlineCallbacks
+    @log_function
+    def send_invite(self, destination, context, event_id, content):
+        path = PREFIX + "/invite/%s/%s" % (
+            context,
+            event_id,
+        )
+
+        code, content = yield self.client.put_json(
+            destination=destination,
+            path=path,
+            data=content,
+        )
+
+        if not 200 <= code < 300:
+            raise RuntimeError("Got %d from send_invite", code)
+
+        defer.returnValue(json.loads(content))
+
+    @defer.inlineCallbacks
+    @log_function
+    def get_event_auth(self, destination, context, event_id):
+        path = PREFIX + "/event_auth/%s/%s" % (
+            context,
+            event_id,
+        )
+
+        response = yield self.client.get_json(
+            destination=destination,
+            path=path,
+        )
+
+        defer.returnValue(response)
+
+    @defer.inlineCallbacks
+    @log_function
+    def _do_request_for_transaction(self, destination, subpath, args={}):
+        """
+        Args:
+            destination (str)
+            path (str)
+            args (dict): This is parsed directly to the HttpClient.
+
+        Returns:
+            Deferred: Results in a dict.
+        """
+
+        data = yield self.client.get_json(
+            destination,
+            path=PREFIX + subpath,
+            args=args,
+        )
+
+        # Add certain keys to the JSON, ready for decoding as a Transaction
+        data.update(
+            origin=destination,
+            destination=self.server_name,
+            transaction_id=None
+        )
+
+        defer.returnValue(data)
diff --git a/synapse/federation/transport.py b/synapse/federation/transport/server.py
index 1f0f06e0fe..34b50def7d 100644
--- a/synapse/federation/transport.py
+++ b/synapse/federation/transport/server.py
@@ -13,14 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-"""The transport layer is responsible for both sending transactions to remote
-home servers and receiving a variety of requests from other home servers.
-
-Typically, this is done over HTTP (and all home servers are required to
-support HTTP), however individual pairings of servers may decide to communicate
-over a different (albeit still reliable) protocol.
-"""
-
 from twisted.internet import defer
 
 from synapse.api.urls import FEDERATION_PREFIX as PREFIX
@@ -35,241 +27,8 @@ import re
 logger = logging.getLogger(__name__)
 
 
-class TransportLayer(object):
-    """This is a basic implementation of the transport layer that translates
-    transactions and other requests to/from HTTP.
-
-    Attributes:
-        server_name (str): Local home server host
-
-        server (synapse.http.server.HttpServer): the http server to
-                register listeners on
-
-        client (synapse.http.client.HttpClient): the http client used to
-                send requests
-
-        request_handler (TransportRequestHandler): The handler to fire when we
-            receive requests for data.
-
-        received_handler (TransportReceivedHandler): The handler to fire when
-            we receive data.
-    """
-
-    def __init__(self, homeserver, server_name, server, client):
-        """
-        Args:
-            server_name (str): Local home server host
-            server (synapse.protocol.http.HttpServer): the http server to
-                register listeners on
-            client (synapse.protocol.http.HttpClient): the http client used to
-                send requests
-        """
-        self.keyring = homeserver.get_keyring()
-        self.server_name = server_name
-        self.server = server
-        self.client = client
-        self.request_handler = None
-        self.received_handler = None
-
-    @log_function
-    def get_context_state(self, destination, context, event_id=None):
-        """ Requests all state for a given context (i.e. room) from the
-        given server.
-
-        Args:
-            destination (str): The host name of the remote home server we want
-                to get the state from.
-            context (str): The name of the context we want the state of
-
-        Returns:
-            Deferred: Results in a dict received from the remote homeserver.
-        """
-        logger.debug("get_context_state dest=%s, context=%s",
-                     destination, context)
-
-        subpath = "/state/%s/" % context
-
-        args = {}
-        if event_id:
-            args["event_id"] = event_id
-
-        return self._do_request_for_transaction(
-            destination, subpath, args=args
-        )
-
-    @log_function
-    def get_event(self, destination, event_id):
-        """ Requests the pdu with give id and origin from the given server.
-
-        Args:
-            destination (str): The host name of the remote home server we want
-                to get the state from.
-            event_id (str): The id of the event being requested.
-
-        Returns:
-            Deferred: Results in a dict received from the remote homeserver.
-        """
-        logger.debug("get_pdu dest=%s, event_id=%s",
-                     destination, event_id)
-
-        subpath = "/event/%s/" % (event_id, )
-
-        return self._do_request_for_transaction(destination, subpath)
-
-    @log_function
-    def backfill(self, dest, context, event_tuples, limit):
-        """ Requests `limit` previous PDUs in a given context before list of
-        PDUs.
-
-        Args:
-            dest (str)
-            context (str)
-            event_tuples (list)
-            limt (int)
-
-        Returns:
-            Deferred: Results in a dict received from the remote homeserver.
-        """
-        logger.debug(
-            "backfill dest=%s, context=%s, event_tuples=%s, limit=%s",
-            dest, context, repr(event_tuples), str(limit)
-        )
-
-        if not event_tuples:
-            # TODO: raise?
-            return
-
-        subpath = "/backfill/%s/" % (context,)
-
-        args = {
-            "v": event_tuples,
-            "limit": [str(limit)],
-        }
-
-        return self._do_request_for_transaction(
-            dest,
-            subpath,
-            args=args,
-        )
-
-    @defer.inlineCallbacks
-    @log_function
-    def send_transaction(self, transaction, json_data_callback=None):
-        """ Sends the given Transaction to its destination
-
-        Args:
-            transaction (Transaction)
-
-        Returns:
-            Deferred: Results of the deferred is a tuple in the form of
-            (response_code, response_body) where the response_body is a
-            python dict decoded from json
-        """
-        logger.debug(
-            "send_data dest=%s, txid=%s",
-            transaction.destination, transaction.transaction_id
-        )
-
-        if transaction.destination == self.server_name:
-            raise RuntimeError("Transport layer cannot send to itself!")
-
-        # FIXME: This is only used by the tests. The actual json sent is
-        # generated by the json_data_callback.
-        json_data = transaction.get_dict()
-
-        code, response = yield self.client.put_json(
-            transaction.destination,
-            path=PREFIX + "/send/%s/" % transaction.transaction_id,
-            data=json_data,
-            json_data_callback=json_data_callback,
-        )
-
-        logger.debug(
-            "send_data dest=%s, txid=%s, got response: %d",
-            transaction.destination, transaction.transaction_id, code
-        )
-
-        defer.returnValue((code, response))
-
-    @defer.inlineCallbacks
-    @log_function
-    def make_query(self, destination, query_type, args, retry_on_dns_fail):
-        path = PREFIX + "/query/%s" % query_type
-
-        response = yield self.client.get_json(
-            destination=destination,
-            path=path,
-            args=args,
-            retry_on_dns_fail=retry_on_dns_fail,
-        )
-
-        defer.returnValue(response)
-
-    @defer.inlineCallbacks
-    @log_function
-    def make_join(self, destination, context, user_id, retry_on_dns_fail=True):
-        path = PREFIX + "/make_join/%s/%s" % (context, user_id,)
-
-        response = yield self.client.get_json(
-            destination=destination,
-            path=path,
-            retry_on_dns_fail=retry_on_dns_fail,
-        )
-
-        defer.returnValue(response)
-
-    @defer.inlineCallbacks
-    @log_function
-    def send_join(self, destination, context, event_id, content):
-        path = PREFIX + "/send_join/%s/%s" % (
-            context,
-            event_id,
-        )
-
-        code, content = yield self.client.put_json(
-            destination=destination,
-            path=path,
-            data=content,
-        )
-
-        if not 200 <= code < 300:
-            raise RuntimeError("Got %d from send_join", code)
-
-        defer.returnValue(json.loads(content))
-
-    @defer.inlineCallbacks
-    @log_function
-    def send_invite(self, destination, context, event_id, content):
-        path = PREFIX + "/invite/%s/%s" % (
-            context,
-            event_id,
-        )
-
-        code, content = yield self.client.put_json(
-            destination=destination,
-            path=path,
-            data=content,
-        )
-
-        if not 200 <= code < 300:
-            raise RuntimeError("Got %d from send_invite", code)
-
-        defer.returnValue(json.loads(content))
-
-    @defer.inlineCallbacks
-    @log_function
-    def get_event_auth(self, destination, context, event_id):
-        path = PREFIX + "/event_auth/%s/%s" % (
-            context,
-            event_id,
-        )
-
-        response = yield self.client.get_json(
-            destination=destination,
-            path=path,
-        )
-
-        defer.returnValue(response)
+class TransportLayerServer(object):
+    """Handles incoming federation HTTP requests"""
 
     @defer.inlineCallbacks
     def _authenticate_request(self, request):
@@ -373,8 +132,6 @@ class TransportLayer(object):
         """
         self.request_handler = handler
 
-        # TODO(markjh): Namespace the federation URI paths
-
         # This is for when someone asks us for everything since version X
         self.server.register_path(
             "GET",
@@ -528,33 +285,6 @@ class TransportLayer(object):
 
         defer.returnValue((code, response))
 
-    @defer.inlineCallbacks
-    @log_function
-    def _do_request_for_transaction(self, destination, subpath, args={}):
-        """
-        Args:
-            destination (str)
-            path (str)
-            args (dict): This is parsed directly to the HttpClient.
-
-        Returns:
-            Deferred: Results in a dict.
-        """
-
-        data = yield self.client.get_json(
-            destination,
-            path=PREFIX + subpath,
-            args=args,
-        )
-
-        # Add certain keys to the JSON, ready for decoding as a Transaction
-        data.update(
-            origin=destination,
-            destination=self.server_name,
-            transaction_id=None
-        )
-
-        defer.returnValue(data)
 
     @log_function
     def _on_backfill_request(self, origin, context, v_list, limits):