summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/message.py11
-rw-r--r--synapse/handlers/room_member_worker.py41
-rw-r--r--synapse/replication/http/_base.py207
-rw-r--r--synapse/replication/http/membership.py262
-rw-r--r--synapse/replication/http/send_event.py111
5 files changed, 355 insertions, 277 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 39d7724778..bcb093ba3e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -30,7 +30,7 @@ from synapse.api.urls import ConsentURIBuilder
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
-from synapse.replication.http.send_event import send_event_to_master
+from synapse.replication.http.send_event import ReplicationSendEventRestServlet
 from synapse.types import RoomAlias, UserID
 from synapse.util.async import Linearizer
 from synapse.util.frozenutils import frozendict_json_encoder
@@ -171,7 +171,7 @@ class EventCreationHandler(object):
         self.notifier = hs.get_notifier()
         self.config = hs.config
 
-        self.http_client = hs.get_simple_http_client()
+        self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
 
         # This is only used to get at ratelimit function, and maybe_kick_guest_users
         self.base_handler = BaseHandler(hs)
@@ -559,12 +559,9 @@ class EventCreationHandler(object):
         try:
             # If we're a worker we need to hit out to the master.
             if self.config.worker_app:
-                yield send_event_to_master(
-                    clock=self.hs.get_clock(),
+                yield self.send_event_to_master(
+                    event_id=event.event_id,
                     store=self.store,
-                    client=self.http_client,
-                    host=self.config.worker_replication_host,
-                    port=self.config.worker_replication_http_port,
                     requester=requester,
                     event=event,
                     context=context,
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 22d8b4b0d3..acc6eb8099 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -20,16 +20,24 @@ from twisted.internet import defer
 from synapse.api.errors import SynapseError
 from synapse.handlers.room_member import RoomMemberHandler
 from synapse.replication.http.membership import (
-    get_or_register_3pid_guest,
-    notify_user_membership_change,
-    remote_join,
-    remote_reject_invite,
+    ReplicationRegister3PIDGuestRestServlet as Repl3PID,
+    ReplicationRemoteJoinRestServlet as ReplRemoteJoin,
+    ReplicationRemoteRejectInviteRestServlet as ReplRejectInvite,
+    ReplicationUserJoinedLeftRoomRestServlet as ReplJoinedLeft,
 )
 
 logger = logging.getLogger(__name__)
 
 
 class RoomMemberWorkerHandler(RoomMemberHandler):
+    def __init__(self, hs):
+        super(RoomMemberWorkerHandler, self).__init__(hs)
+
+        self._get_register_3pid_client = Repl3PID.make_client(hs)
+        self._remote_join_client = ReplRemoteJoin.make_client(hs)
+        self._remote_reject_client = ReplRejectInvite.make_client(hs)
+        self._notify_change_client = ReplJoinedLeft.make_client(hs)
+
     @defer.inlineCallbacks
     def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
         """Implements RoomMemberHandler._remote_join
@@ -37,10 +45,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
         if len(remote_room_hosts) == 0:
             raise SynapseError(404, "No known servers")
 
-        ret = yield remote_join(
-            self.simple_http_client,
-            host=self.config.worker_replication_host,
-            port=self.config.worker_replication_http_port,
+        ret = yield self._remote_join_client(
             requester=requester,
             remote_room_hosts=remote_room_hosts,
             room_id=room_id,
@@ -55,10 +60,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
     def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
         """Implements RoomMemberHandler._remote_reject_invite
         """
-        return remote_reject_invite(
-            self.simple_http_client,
-            host=self.config.worker_replication_host,
-            port=self.config.worker_replication_http_port,
+        return self._remote_reject_client(
             requester=requester,
             remote_room_hosts=remote_room_hosts,
             room_id=room_id,
@@ -68,10 +70,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
     def _user_joined_room(self, target, room_id):
         """Implements RoomMemberHandler._user_joined_room
         """
-        return notify_user_membership_change(
-            self.simple_http_client,
-            host=self.config.worker_replication_host,
-            port=self.config.worker_replication_http_port,
+        return self._notify_change_client(
             user_id=target.to_string(),
             room_id=room_id,
             change="joined",
@@ -80,10 +79,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
     def _user_left_room(self, target, room_id):
         """Implements RoomMemberHandler._user_left_room
         """
-        return notify_user_membership_change(
-            self.simple_http_client,
-            host=self.config.worker_replication_host,
-            port=self.config.worker_replication_http_port,
+        return self._notify_change_client(
             user_id=target.to_string(),
             room_id=room_id,
             change="left",
@@ -92,10 +88,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
     def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
         """Implements RoomMemberHandler.get_or_register_3pid_guest
         """
-        return get_or_register_3pid_guest(
-            self.simple_http_client,
-            host=self.config.worker_replication_host,
-            port=self.config.worker_replication_http_port,
+        return self._get_register_3pid_client(
             requester=requester,
             medium=medium,
             address=address,
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
new file mode 100644
index 0000000000..a7d1b2dabe
--- /dev/null
+++ b/synapse/replication/http/_base.py
@@ -0,0 +1,207 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import abc
+import logging
+import re
+
+from six.moves import urllib
+
+from twisted.internet import defer
+
+from synapse.api.errors import (
+    CodeMessageException,
+    HttpResponseException,
+)
+from synapse.util.caches.response_cache import ResponseCache
+from synapse.util.stringutils import random_string
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationEndpoint(object):
+    """Helper base class for defining new replication HTTP endpoints.
+
+    This creates an endpoint under `/_synapse/replication/:NAME/:PATH_ARGS..`
+    (with an `/:txn_id` prefix for cached requests.), where NAME is a name,
+    PATH_ARGS are a tuple of parameters to be encoded in the URL.
+
+    For example, if `NAME` is "send_event" and `PATH_ARGS` is `("event_id",)`,
+    with `CACHE` set to true then this generates an endpoint:
+
+        /_synapse/replication/send_event/:event_id/:txn_id
+
+    For POST requests the payload is serialized to json and sent as the body,
+    while for GET requests the payload is added as query parameters. See
+    `_serialize_payload` for details.
+
+    Incoming requests are handled by overriding `_handle_request`. Servers
+    must call `register` to register the path with the HTTP server.
+
+    Requests can be sent by calling the client returned by `make_client`.
+
+    Attributes:
+        NAME (str): A name for the endpoint, added to the path as well as used
+            in logging and metrics.
+        PATH_ARGS (tuple[str]): A list of parameters to be added to the path.
+            Adding parameters to the path (rather than payload) can make it
+            easier to follow along in the log files.
+        POST (bool): True to use POST request with JSON body, or false to use
+            GET requests with query params.
+        CACHE (bool): Whether server should cache the result of the request/
+            If true then transparently adds a txn_id to all requests, and
+            `_handle_request` must return a Deferred.
+        RETRY_ON_TIMEOUT(bool): Whether or not to retry the request when a 504
+            is received.
+    """
+
+    __metaclass__ = abc.ABCMeta
+
+    NAME = abc.abstractproperty()
+    PATH_ARGS = abc.abstractproperty()
+
+    POST = True
+    CACHE = True
+    RETRY_ON_TIMEOUT = True
+
+    def __init__(self, hs):
+        if self.CACHE:
+            self.response_cache = ResponseCache(
+                hs, "repl." + self.NAME,
+                timeout_ms=30 * 60 * 1000,
+            )
+
+    @abc.abstractmethod
+    def _serialize_payload(**kwargs):
+        """Static method that is called when creating a request.
+
+        Concrete implementations should have explicit parameters (rather than
+        kwargs) so that an appropriate exception is raised if the client is
+        called with unexpected parameters. All PATH_ARGS must appear in
+        argument list.
+
+        Returns:
+            Deferred[dict]|dict: If POST request then dictionary must be JSON
+            serialisable, otherwise must be appropriate for adding as query
+            args.
+        """
+        return {}
+
+    @abc.abstractmethod
+    def _handle_request(self, request, **kwargs):
+        """Handle incoming request.
+
+        This is called with the request object and PATH_ARGS.
+
+        Returns:
+            Deferred[dict]: A JSON serialisable dict to be used as response
+            body of request.
+        """
+        pass
+
+    @classmethod
+    def make_client(cls, hs):
+        """Create a client that makes requests.
+
+        Returns a callable that accepts the same parameters as `_serialize_payload`.
+        """
+        clock = hs.get_clock()
+        host = hs.config.worker_replication_host
+        port = hs.config.worker_replication_http_port
+
+        client = hs.get_simple_http_client()
+
+        @defer.inlineCallbacks
+        def send_request(**kwargs):
+            data = yield cls._serialize_payload(**kwargs)
+
+            url_args = [urllib.parse.quote(kwargs[name]) for name in cls.PATH_ARGS]
+
+            if cls.CACHE:
+                txn_id = random_string(10)
+                url_args.append(txn_id)
+
+            if cls.POST:
+                request_func = client.post_json_get_json
+            else:
+                request_func = client.get_json
+
+            uri = "http://%s:%s/_synapse/replication/%s/%s" % (
+                host, port, cls.NAME, "/".join(url_args)
+            )
+
+            try:
+                # We keep retrying the same request for timeouts. This is so that we
+                # have a good idea that the request has either succeeded or failed on
+                # the master, and so whether we should clean up or not.
+                while True:
+                    try:
+                        result = yield request_func(uri, data)
+                        break
+                    except CodeMessageException as e:
+                        if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
+                            raise
+
+                    logger.warn("send_federation_events_to_master request timed out")
+
+                    # If we timed out we probably don't need to worry about backing
+                    # off too much, but lets just wait a little anyway.
+                    yield clock.sleep(1)
+            except HttpResponseException as e:
+                # We convert to SynapseError as we know that it was a SynapseError
+                # on the master process that we should send to the client. (And
+                # importantly, not stack traces everywhere)
+                raise e.to_synapse_error()
+
+            defer.returnValue(result)
+
+        return send_request
+
+    def register(self, http_server):
+        """Called by the server to register this as a handler to the
+        appropriate path.
+        """
+
+        url_args = list(self.PATH_ARGS)
+        method = "GET"
+        handler = self._handle_request
+        if self.POST:
+            method = "POST"
+
+        if self.CACHE:
+            handler = self._cached_handler
+            url_args.append("txn_id")
+
+        args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args)
+        pattern = re.compile("^/_synapse/replication/%s/%s$" % (
+            self.NAME,
+            args
+        ))
+
+        http_server.register_paths(method, [pattern], handler)
+
+    def _cached_handler(self, request, txn_id, **kwargs):
+        """Wraps `_handle_request` the responses should be cached.
+        """
+        # We just use the txn_id here, but we probably also want to use the
+        # other PATH_ARGS as well.
+
+        assert self.CACHE
+
+        return self.response_cache.wrap(
+            txn_id,
+            self._handle_request,
+            request, **kwargs
+        )
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index 7a3cfb159c..8ad83e8421 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -14,182 +14,53 @@
 # limitations under the License.
 
 import logging
-import re
 
 from twisted.internet import defer
 
-from synapse.api.errors import HttpResponseException
-from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
 from synapse.types import Requester, UserID
 from synapse.util.distributor import user_joined_room, user_left_room
 
 logger = logging.getLogger(__name__)
 
 
-@defer.inlineCallbacks
-def remote_join(client, host, port, requester, remote_room_hosts,
-                room_id, user_id, content):
-    """Ask the master to do a remote join for the given user to the given room
-
-    Args:
-        client (SimpleHttpClient)
-        host (str): host of master
-        port (int): port on master listening for HTTP replication
-        requester (Requester)
-        remote_room_hosts (list[str]): Servers to try and join via
-        room_id (str)
-        user_id (str)
-        content (dict): The event content to use for the join event
-
-    Returns:
-        Deferred
-    """
-    uri = "http://%s:%s/_synapse/replication/remote_join" % (host, port)
-
-    payload = {
-        "requester": requester.serialize(),
-        "remote_room_hosts": remote_room_hosts,
-        "room_id": room_id,
-        "user_id": user_id,
-        "content": content,
-    }
-
-    try:
-        result = yield client.post_json_get_json(uri, payload)
-    except HttpResponseException as e:
-        # We convert to SynapseError as we know that it was a SynapseError
-        # on the master process that we should send to the client. (And
-        # importantly, not stack traces everywhere)
-        raise e.to_synapse_error()
-    defer.returnValue(result)
-
-
-@defer.inlineCallbacks
-def remote_reject_invite(client, host, port, requester, remote_room_hosts,
-                         room_id, user_id):
-    """Ask master to reject the invite for the user and room.
-
-    Args:
-        client (SimpleHttpClient)
-        host (str): host of master
-        port (int): port on master listening for HTTP replication
-        requester (Requester)
-        remote_room_hosts (list[str]): Servers to try and reject via
-        room_id (str)
-        user_id (str)
-
-    Returns:
-        Deferred
-    """
-    uri = "http://%s:%s/_synapse/replication/remote_reject_invite" % (host, port)
-
-    payload = {
-        "requester": requester.serialize(),
-        "remote_room_hosts": remote_room_hosts,
-        "room_id": room_id,
-        "user_id": user_id,
-    }
-
-    try:
-        result = yield client.post_json_get_json(uri, payload)
-    except HttpResponseException as e:
-        # We convert to SynapseError as we know that it was a SynapseError
-        # on the master process that we should send to the client. (And
-        # importantly, not stack traces everywhere)
-        raise e.to_synapse_error()
-    defer.returnValue(result)
-
-
-@defer.inlineCallbacks
-def get_or_register_3pid_guest(client, host, port, requester,
-                               medium, address, inviter_user_id):
-    """Ask the master to get/create a guest account for given 3PID.
-
-    Args:
-        client (SimpleHttpClient)
-        host (str): host of master
-        port (int): port on master listening for HTTP replication
-        requester (Requester)
-        medium (str)
-        address (str)
-        inviter_user_id (str): The user ID who is trying to invite the
-            3PID
-
-    Returns:
-        Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
-        3PID guest account.
-    """
-
-    uri = "http://%s:%s/_synapse/replication/get_or_register_3pid_guest" % (host, port)
-
-    payload = {
-        "requester": requester.serialize(),
-        "medium": medium,
-        "address": address,
-        "inviter_user_id": inviter_user_id,
-    }
-
-    try:
-        result = yield client.post_json_get_json(uri, payload)
-    except HttpResponseException as e:
-        # We convert to SynapseError as we know that it was a SynapseError
-        # on the master process that we should send to the client. (And
-        # importantly, not stack traces everywhere)
-        raise e.to_synapse_error()
-    defer.returnValue(result)
-
-
-@defer.inlineCallbacks
-def notify_user_membership_change(client, host, port, user_id, room_id, change):
-    """Notify master that a user has joined or left the room
-
-    Args:
-        client (SimpleHttpClient)
-        host (str): host of master
-        port (int): port on master listening for HTTP replication.
-        user_id (str)
-        room_id (str)
-        change (str): Either "join" or "left"
-
-    Returns:
-        Deferred
+class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
+    """Does a remote join for the given user to the given room
     """
-    assert change in ("joined", "left")
-
-    uri = "http://%s:%s/_synapse/replication/user_%s_room" % (host, port, change)
-
-    payload = {
-        "user_id": user_id,
-        "room_id": room_id,
-    }
-
-    try:
-        result = yield client.post_json_get_json(uri, payload)
-    except HttpResponseException as e:
-        # We convert to SynapseError as we know that it was a SynapseError
-        # on the master process that we should send to the client. (And
-        # importantly, not stack traces everywhere)
-        raise e.to_synapse_error()
-    defer.returnValue(result)
-
 
-class ReplicationRemoteJoinRestServlet(RestServlet):
-    PATTERNS = [re.compile("^/_synapse/replication/remote_join$")]
+    NAME = "remote_join"
+    PATH_ARGS = ("room_id", "user_id",)
 
     def __init__(self, hs):
-        super(ReplicationRemoteJoinRestServlet, self).__init__()
+        super(ReplicationRemoteJoinRestServlet, self).__init__(hs)
 
         self.federation_handler = hs.get_handlers().federation_handler
         self.store = hs.get_datastore()
         self.clock = hs.get_clock()
 
+    @staticmethod
+    def _serialize_payload(requester, room_id, user_id, remote_room_hosts,
+                           content):
+        """
+        Args:
+            requester(Requester)
+            room_id (str)
+            user_id (str)
+            remote_room_hosts (list[str]): Servers to try and join via
+            content(dict): The event content to use for the join event
+        """
+        return {
+            "requester": requester.serialize(),
+            "remote_room_hosts": remote_room_hosts,
+            "content": content,
+        }
+
     @defer.inlineCallbacks
-    def on_POST(self, request):
+    def _handle_request(self, request, room_id, user_id):
         content = parse_json_object_from_request(request)
 
         remote_room_hosts = content["remote_room_hosts"]
-        room_id = content["room_id"]
-        user_id = content["user_id"]
         event_content = content["content"]
 
         requester = Requester.deserialize(self.store, content["requester"])
@@ -212,23 +83,39 @@ class ReplicationRemoteJoinRestServlet(RestServlet):
         defer.returnValue((200, {}))
 
 
-class ReplicationRemoteRejectInviteRestServlet(RestServlet):
-    PATTERNS = [re.compile("^/_synapse/replication/remote_reject_invite$")]
+class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
+    """Rejects the invite for the user and room.
+    """
+
+    NAME = "remote_reject_invite"
+    PATH_ARGS = ("room_id", "user_id",)
 
     def __init__(self, hs):
-        super(ReplicationRemoteRejectInviteRestServlet, self).__init__()
+        super(ReplicationRemoteRejectInviteRestServlet, self).__init__(hs)
 
         self.federation_handler = hs.get_handlers().federation_handler
         self.store = hs.get_datastore()
         self.clock = hs.get_clock()
 
+    @staticmethod
+    def _serialize_payload(requester, room_id, user_id, remote_room_hosts):
+        """
+        Args:
+            requester(Requester)
+            room_id (str)
+            user_id (str)
+            remote_room_hosts (list[str]): Servers to try and reject via
+        """
+        return {
+            "requester": requester.serialize(),
+            "remote_room_hosts": remote_room_hosts,
+        }
+
     @defer.inlineCallbacks
-    def on_POST(self, request):
+    def _handle_request(self, request, room_id, user_id):
         content = parse_json_object_from_request(request)
 
         remote_room_hosts = content["remote_room_hosts"]
-        room_id = content["room_id"]
-        user_id = content["user_id"]
 
         requester = Requester.deserialize(self.store, content["requester"])
 
@@ -264,18 +151,39 @@ class ReplicationRemoteRejectInviteRestServlet(RestServlet):
         defer.returnValue((200, ret))
 
 
-class ReplicationRegister3PIDGuestRestServlet(RestServlet):
-    PATTERNS = [re.compile("^/_synapse/replication/get_or_register_3pid_guest$")]
+class ReplicationRegister3PIDGuestRestServlet(ReplicationEndpoint):
+    """Gets/creates a guest account for given 3PID.
+    """
+
+    NAME = "get_or_register_3pid_guest"
+    PATH_ARGS = ()
 
     def __init__(self, hs):
-        super(ReplicationRegister3PIDGuestRestServlet, self).__init__()
+        super(ReplicationRegister3PIDGuestRestServlet, self).__init__(hs)
 
         self.registeration_handler = hs.get_handlers().registration_handler
         self.store = hs.get_datastore()
         self.clock = hs.get_clock()
 
+    @staticmethod
+    def _serialize_payload(requester, medium, address, inviter_user_id):
+        """
+        Args:
+            requester(Requester)
+            medium (str)
+            address (str)
+            inviter_user_id (str): The user ID who is trying to invite the
+                3PID
+        """
+        return {
+            "requester": requester.serialize(),
+            "medium": medium,
+            "address": address,
+            "inviter_user_id": inviter_user_id,
+        }
+
     @defer.inlineCallbacks
-    def on_POST(self, request):
+    def _handle_request(self, request):
         content = parse_json_object_from_request(request)
 
         medium = content["medium"]
@@ -296,23 +204,35 @@ class ReplicationRegister3PIDGuestRestServlet(RestServlet):
         defer.returnValue((200, ret))
 
 
-class ReplicationUserJoinedLeftRoomRestServlet(RestServlet):
-    PATTERNS = [re.compile("^/_synapse/replication/user_(?P<change>joined|left)_room$")]
+class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
+    """Notifies that a user has joined or left the room
+    """
+
+    NAME = "membership_change"
+    PATH_ARGS = ("room_id", "user_id", "change")
+    CACHE = False  # No point caching as should return instantly.
 
     def __init__(self, hs):
-        super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__()
+        super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__(hs)
 
         self.registeration_handler = hs.get_handlers().registration_handler
         self.store = hs.get_datastore()
         self.clock = hs.get_clock()
         self.distributor = hs.get_distributor()
 
-    def on_POST(self, request, change):
-        content = parse_json_object_from_request(request)
+    @staticmethod
+    def _serialize_payload(room_id, user_id, change):
+        """
+        Args:
+            room_id (str)
+            user_id (str)
+            change (str): Either "joined" or "left"
+        """
+        assert change in ("joined", "left",)
 
-        user_id = content["user_id"]
-        room_id = content["room_id"]
+        return {}
 
+    def _handle_request(self, request, room_id, user_id, change):
         logger.info("user membership change: %s in %s", user_id, room_id)
 
         user = UserID.from_string(user_id)
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index d3509dc288..50810d94cb 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -14,86 +14,26 @@
 # limitations under the License.
 
 import logging
-import re
 
 from twisted.internet import defer
 
-from synapse.api.errors import CodeMessageException, HttpResponseException
 from synapse.events import FrozenEvent
 from synapse.events.snapshot import EventContext
-from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
 from synapse.types import Requester, UserID
-from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
 
 
-@defer.inlineCallbacks
-def send_event_to_master(clock, store, client, host, port, requester, event, context,
-                         ratelimit, extra_users):
-    """Send event to be handled on the master
-
-    Args:
-        clock (synapse.util.Clock)
-        store (DataStore)
-        client (SimpleHttpClient)
-        host (str): host of master
-        port (int): port on master listening for HTTP replication
-        requester (Requester)
-        event (FrozenEvent)
-        context (EventContext)
-        ratelimit (bool)
-        extra_users (list(UserID)): Any extra users to notify about event
-    """
-    uri = "http://%s:%s/_synapse/replication/send_event/%s" % (
-        host, port, event.event_id,
-    )
-
-    serialized_context = yield context.serialize(event, store)
-
-    payload = {
-        "event": event.get_pdu_json(),
-        "internal_metadata": event.internal_metadata.get_dict(),
-        "rejected_reason": event.rejected_reason,
-        "context": serialized_context,
-        "requester": requester.serialize(),
-        "ratelimit": ratelimit,
-        "extra_users": [u.to_string() for u in extra_users],
-    }
-
-    try:
-        # We keep retrying the same request for timeouts. This is so that we
-        # have a good idea that the request has either succeeded or failed on
-        # the master, and so whether we should clean up or not.
-        while True:
-            try:
-                result = yield client.put_json(uri, payload)
-                break
-            except CodeMessageException as e:
-                if e.code != 504:
-                    raise
-
-            logger.warn("send_event request timed out")
-
-            # If we timed out we probably don't need to worry about backing
-            # off too much, but lets just wait a little anyway.
-            yield clock.sleep(1)
-    except HttpResponseException as e:
-        # We convert to SynapseError as we know that it was a SynapseError
-        # on the master process that we should send to the client. (And
-        # importantly, not stack traces everywhere)
-        raise e.to_synapse_error()
-    defer.returnValue(result)
-
-
-class ReplicationSendEventRestServlet(RestServlet):
+class ReplicationSendEventRestServlet(ReplicationEndpoint):
     """Handles events newly created on workers, including persisting and
     notifying.
 
     The API looks like:
 
-        POST /_synapse/replication/send_event/:event_id
+        POST /_synapse/replication/send_event/:event_id/:txn_id
 
         {
             "event": { .. serialized event .. },
@@ -105,27 +45,48 @@ class ReplicationSendEventRestServlet(RestServlet):
             "extra_users": [],
         }
     """
-    PATTERNS = [re.compile("^/_synapse/replication/send_event/(?P<event_id>[^/]+)$")]
+    NAME = "send_event"
+    PATH_ARGS = ("event_id",)
+    POST = True
 
     def __init__(self, hs):
-        super(ReplicationSendEventRestServlet, self).__init__()
+        super(ReplicationSendEventRestServlet, self).__init__(hs)
 
         self.event_creation_handler = hs.get_event_creation_handler()
         self.store = hs.get_datastore()
         self.clock = hs.get_clock()
 
-        # The responses are tiny, so we may as well cache them for a while
-        self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
+    @staticmethod
+    @defer.inlineCallbacks
+    def _serialize_payload(event_id, store, event, context, requester,
+                           ratelimit, extra_users):
+        """
+        Args:
+            event_id (str)
+            store (DataStore)
+            requester (Requester)
+            event (FrozenEvent)
+            context (EventContext)
+            ratelimit (bool)
+            extra_users (list(UserID)): Any extra users to notify about event
+        """
+
+        serialized_context = yield context.serialize(event, store)
+
+        payload = {
+            "event": event.get_pdu_json(),
+            "internal_metadata": event.internal_metadata.get_dict(),
+            "rejected_reason": event.rejected_reason,
+            "context": serialized_context,
+            "requester": requester.serialize(),
+            "ratelimit": ratelimit,
+            "extra_users": [u.to_string() for u in extra_users],
+        }
 
-    def on_PUT(self, request, event_id):
-        return self.response_cache.wrap(
-            event_id,
-            self._handle_request,
-            request
-        )
+        defer.returnValue(payload)
 
     @defer.inlineCallbacks
-    def _handle_request(self, request):
+    def _handle_request(self, request, event_id):
         with Measure(self.clock, "repl_send_event_parse"):
             content = parse_json_object_from_request(request)