summary refs log tree commit diff
path: root/synapse/federation/transport/client.py
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2015-01-16 13:21:14 +0000
committerMark Haines <mark.haines@matrix.org>2015-01-16 19:01:03 +0000
commit602684eac5b7acf61e10d7fabed4977635d3fb46 (patch)
tree31e3ced4a46f1d38d1831f39c5ab677c17d4dd79 /synapse/federation/transport/client.py
parentRemove temporary debug logging that was accidentally committed (diff)
downloadsynapse-602684eac5b7acf61e10d7fabed4977635d3fb46.tar.xz
Split transport layer into client and server parts
Diffstat (limited to 'synapse/federation/transport/client.py')
-rw-r--r--synapse/federation/transport/client.py257
1 files changed, 257 insertions, 0 deletions
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
new file mode 100644
index 0000000000..604ade683b
--- /dev/null
+++ b/synapse/federation/transport/client.py
@@ -0,0 +1,257 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.api.urls import FEDERATION_PREFIX as PREFIX
+from synapse.util.logutils import log_function
+
+import logging
+import json
+
+
+logger = logging.getLogger(__name__)
+
+
+class TransportLayerClient(object):
+    """Sends federation HTTP requests to other servers"""
+
+    @log_function
+    def get_context_state(self, destination, context, event_id=None):
+        """ Requests all state for a given context (i.e. room) from the
+        given server.
+
+        Args:
+            destination (str): The host name of the remote home server we want
+                to get the state from.
+            context (str): The name of the context we want the state of
+
+        Returns:
+            Deferred: Results in a dict received from the remote homeserver.
+        """
+        logger.debug("get_context_state dest=%s, context=%s",
+                     destination, context)
+
+        subpath = "/state/%s/" % context
+
+        args = {}
+        if event_id:
+            args["event_id"] = event_id
+
+        return self._do_request_for_transaction(
+            destination, subpath, args=args
+        )
+
+    @log_function
+    def get_event(self, destination, event_id):
+        """ Requests the pdu with give id and origin from the given server.
+
+        Args:
+            destination (str): The host name of the remote home server we want
+                to get the state from.
+            event_id (str): The id of the event being requested.
+
+        Returns:
+            Deferred: Results in a dict received from the remote homeserver.
+        """
+        logger.debug("get_pdu dest=%s, event_id=%s",
+                     destination, event_id)
+
+        subpath = "/event/%s/" % (event_id, )
+
+        return self._do_request_for_transaction(destination, subpath)
+
+    @log_function
+    def backfill(self, dest, context, event_tuples, limit):
+        """ Requests `limit` previous PDUs in a given context before list of
+        PDUs.
+
+        Args:
+            dest (str)
+            context (str)
+            event_tuples (list)
+            limt (int)
+
+        Returns:
+            Deferred: Results in a dict received from the remote homeserver.
+        """
+        logger.debug(
+            "backfill dest=%s, context=%s, event_tuples=%s, limit=%s",
+            dest, context, repr(event_tuples), str(limit)
+        )
+
+        if not event_tuples:
+            # TODO: raise?
+            return
+
+        subpath = "/backfill/%s/" % (context,)
+
+        args = {
+            "v": event_tuples,
+            "limit": [str(limit)],
+        }
+
+        return self._do_request_for_transaction(
+            dest,
+            subpath,
+            args=args,
+        )
+
+    @defer.inlineCallbacks
+    @log_function
+    def send_transaction(self, transaction, json_data_callback=None):
+        """ Sends the given Transaction to its destination
+
+        Args:
+            transaction (Transaction)
+
+        Returns:
+            Deferred: Results of the deferred is a tuple in the form of
+            (response_code, response_body) where the response_body is a
+            python dict decoded from json
+        """
+        logger.debug(
+            "send_data dest=%s, txid=%s",
+            transaction.destination, transaction.transaction_id
+        )
+
+        if transaction.destination == self.server_name:
+            raise RuntimeError("Transport layer cannot send to itself!")
+
+        # FIXME: This is only used by the tests. The actual json sent is
+        # generated by the json_data_callback.
+        json_data = transaction.get_dict()
+
+        code, response = yield self.client.put_json(
+            transaction.destination,
+            path=PREFIX + "/send/%s/" % transaction.transaction_id,
+            data=json_data,
+            json_data_callback=json_data_callback,
+        )
+
+        logger.debug(
+            "send_data dest=%s, txid=%s, got response: %d",
+            transaction.destination, transaction.transaction_id, code
+        )
+
+        defer.returnValue((code, response))
+
+    @defer.inlineCallbacks
+    @log_function
+    def make_query(self, destination, query_type, args, retry_on_dns_fail):
+        path = PREFIX + "/query/%s" % query_type
+
+        response = yield self.client.get_json(
+            destination=destination,
+            path=path,
+            args=args,
+            retry_on_dns_fail=retry_on_dns_fail,
+        )
+
+        defer.returnValue(response)
+
+    @defer.inlineCallbacks
+    @log_function
+    def make_join(self, destination, context, user_id, retry_on_dns_fail=True):
+        path = PREFIX + "/make_join/%s/%s" % (context, user_id,)
+
+        response = yield self.client.get_json(
+            destination=destination,
+            path=path,
+            retry_on_dns_fail=retry_on_dns_fail,
+        )
+
+        defer.returnValue(response)
+
+    @defer.inlineCallbacks
+    @log_function
+    def send_join(self, destination, context, event_id, content):
+        path = PREFIX + "/send_join/%s/%s" % (
+            context,
+            event_id,
+        )
+
+        code, content = yield self.client.put_json(
+            destination=destination,
+            path=path,
+            data=content,
+        )
+
+        if not 200 <= code < 300:
+            raise RuntimeError("Got %d from send_join", code)
+
+        defer.returnValue(json.loads(content))
+
+    @defer.inlineCallbacks
+    @log_function
+    def send_invite(self, destination, context, event_id, content):
+        path = PREFIX + "/invite/%s/%s" % (
+            context,
+            event_id,
+        )
+
+        code, content = yield self.client.put_json(
+            destination=destination,
+            path=path,
+            data=content,
+        )
+
+        if not 200 <= code < 300:
+            raise RuntimeError("Got %d from send_invite", code)
+
+        defer.returnValue(json.loads(content))
+
+    @defer.inlineCallbacks
+    @log_function
+    def get_event_auth(self, destination, context, event_id):
+        path = PREFIX + "/event_auth/%s/%s" % (
+            context,
+            event_id,
+        )
+
+        response = yield self.client.get_json(
+            destination=destination,
+            path=path,
+        )
+
+        defer.returnValue(response)
+
+    @defer.inlineCallbacks
+    @log_function
+    def _do_request_for_transaction(self, destination, subpath, args={}):
+        """
+        Args:
+            destination (str)
+            path (str)
+            args (dict): This is parsed directly to the HttpClient.
+
+        Returns:
+            Deferred: Results in a dict.
+        """
+
+        data = yield self.client.get_json(
+            destination,
+            path=PREFIX + subpath,
+            args=args,
+        )
+
+        # Add certain keys to the JSON, ready for decoding as a Transaction
+        data.update(
+            origin=destination,
+            destination=self.server_name,
+            transaction_id=None
+        )
+
+        defer.returnValue(data)