diff options
author | Erik Johnston <erik@matrix.org> | 2018-07-31 13:52:49 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2018-07-31 14:32:20 +0100 |
commit | d81602b75afc2c39314da1f9a21be436134e5361 (patch) | |
tree | 9201eefa2e78580e33141cb15b8a0f3bc0a80a67 | |
parent | synapse grafana dashboard (diff) | |
download | synapse-d81602b75afc2c39314da1f9a21be436134e5361.tar.xz |
Add helper base class for generating new replication endpoints
This will hopefully reduce the boiler plate required to implement new internal HTTP requests.
-rw-r--r-- | synapse/replication/http/_base.py | 208 |
1 files changed, 208 insertions, 0 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py new file mode 100644 index 0000000000..24f00d95fc --- /dev/null +++ b/synapse/replication/http/_base.py @@ -0,0 +1,208 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import logging +import re + +from six.moves import urllib + +from twisted.internet import defer + +from synapse.api.errors import ( + CodeMessageException, + MatrixCodeMessageException, + SynapseError, +) +from synapse.util.caches.response_cache import ResponseCache +from synapse.util.stringutils import random_string + +logger = logging.getLogger(__name__) + + +class ReplicationEndpoint(object): + """Helper base class for defining new replication HTTP endpoints. + + This creates an endpoint under `/_synapse/replication/:NAME/:PATH_ARGS..` + (with an `/:txn_id` prefix for cached requests.), where NAME is a name, + PATH_ARGS are a tuple of parameters to be encoded in the URL. + + For example, if `NAME` is "send_event" and `PATH_ARGS` is `("event_id",)`, + with `CACHE` set to true then this generates an endpoint: + + /_synapse/replication/send_event/:event_id/:txn_id + + For POST requests the payload is serialized to json and sent as the body, + while for GET requests the payload is added as query parameters. See + `_serialize_payload` for details. + + Incoming requests are handled by overriding `_handle_request`. Servers + must call `register` to register the path with the HTTP server. + + Requests can be sent by calling the client returned by `make_client`. + + Attributes: + NAME (str): A name for the endpoint, added to the path as well as used + in logging and metrics. + PATH_ARGS (tuple[str]): A list of parameters to be added to the path. + Adding parameters to the path (rather than payload) can make it + easier to follow along in the log files. + POST (bool): True to use POST request with JSON body, or false to use + GET requests with query params. + CACHE (bool): Whether server should cache the result of the request/ + If true then transparently adds a txn_id to all requests, and + `_handle_request` must return a Deferred. + RETRY_ON_TIMEOUT(bool): Whether or not to retry the request when a 504 + is received. + """ + + __metaclass__ = abc.ABCMeta + + NAME = abc.abstractproperty() + PATH_ARGS = abc.abstractproperty() + + POST = True + CACHE = True + RETRY_ON_TIMEOUT = True + + def __init__(self, hs): + if self.CACHE: + self.response_cache = ResponseCache( + hs, "repl." + self.NAME, + timeout_ms=30 * 60 * 1000, + ) + + @abc.abstractmethod + def _serialize_payload(**kwargs): + """Static method that is called when creating a request. + + Concrete implementations should have explicit parameters (rather than + kwargs) so that an appropriate exception is raised if the client is + called with unexpected parameters. All PATH_ARGS must appear in + argument list. + + Returns: + Deferred[dict]|dict: If POST request then dictionary must be JSON + serialisable, otherwise must be appropriate for adding as query + args. + """ + return {} + + @abc.abstractmethod + def _handle_request(self, request, **kwargs): + """Handle incoming request. + + This is called with the request object and PATH_ARGS. + + Returns: + Deferred[dict]: A JSON serialisable dict to be used as response + body of request. + """ + pass + + @classmethod + def make_client(cls, hs): + """Create a client that makes requests. + + Returns a callable that accepts the same parameters as `_serialize_payload`. + """ + clock = hs.get_clock() + host = hs.config.worker_replication_host + port = hs.config.worker_replication_http_port + + client = hs.get_simple_http_client() + + @defer.inlineCallbacks + def send_request(**kwargs): + data = yield cls._serialize_payload(**kwargs) + + url_args = [urllib.parse.quote(kwargs[name]) for name in cls.PATH_ARGS] + + if cls.CACHE: + txn_id = random_string(10) + url_args.append(txn_id) + + if cls.POST: + request_func = client.post_json_get_json + else: + request_func = client.get_json + + uri = "http://%s:%s/_synapse/replication/%s/%s" % ( + host, port, cls.NAME, "/".join(url_args) + ) + + try: + # We keep retrying the same request for timeouts. This is so that we + # have a good idea that the request has either succeeded or failed on + # the master, and so whether we should clean up or not. + while True: + try: + result = yield request_func(uri, data) + break + except CodeMessageException as e: + if e.code != 504 or not cls.RETRY_ON_TIMEOUT: + raise + + logger.warn("send_federation_events_to_master request timed out") + + # If we timed out we probably don't need to worry about backing + # off too much, but lets just wait a little anyway. + yield clock.sleep(1) + except MatrixCodeMessageException as e: + # We convert to SynapseError as we know that it was a SynapseError + # on the master process that we should send to the client. (And + # importantly, not stack traces everywhere) + raise SynapseError(e.code, e.msg, e.errcode) + + defer.returnValue(result) + + return send_request + + def register(self, http_server): + """Called by the server to register this as a handler to the + appropriate path. + """ + + url_args = list(self.PATH_ARGS) + method = "GET" + handler = self._handle_request + if self.POST: + method = "POST" + + if self.CACHE: + handler = self._cached_handler + url_args.append("txn_id") + + args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args) + pattern = re.compile("^/_synapse/replication/%s/%s$" % ( + self.NAME, + args + )) + + http_server.register_paths(method, [pattern], handler) + + def _cached_handler(self, request, txn_id, **kwargs): + """Wraps `_handle_request` the responses should be cached. + """ + # We just use the txn_id here, but we probably also want to use the + # other PATH_ARGS as well. + + assert self.CACHE + + return self.response_cache.wrap( + txn_id, + self._handle_request, + request, **kwargs + ) |