diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 39d7724778..bcb093ba3e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -30,7 +30,7 @@ from synapse.api.urls import ConsentURIBuilder
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
-from synapse.replication.http.send_event import send_event_to_master
+from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.types import RoomAlias, UserID
from synapse.util.async import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
@@ -171,7 +171,7 @@ class EventCreationHandler(object):
self.notifier = hs.get_notifier()
self.config = hs.config
- self.http_client = hs.get_simple_http_client()
+ self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
# This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs)
@@ -559,12 +559,9 @@ class EventCreationHandler(object):
try:
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
- yield send_event_to_master(
- clock=self.hs.get_clock(),
+ yield self.send_event_to_master(
+ event_id=event.event_id,
store=self.store,
- client=self.http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
requester=requester,
event=event,
context=context,
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 22d8b4b0d3..acc6eb8099 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -20,16 +20,24 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.handlers.room_member import RoomMemberHandler
from synapse.replication.http.membership import (
- get_or_register_3pid_guest,
- notify_user_membership_change,
- remote_join,
- remote_reject_invite,
+ ReplicationRegister3PIDGuestRestServlet as Repl3PID,
+ ReplicationRemoteJoinRestServlet as ReplRemoteJoin,
+ ReplicationRemoteRejectInviteRestServlet as ReplRejectInvite,
+ ReplicationUserJoinedLeftRoomRestServlet as ReplJoinedLeft,
)
logger = logging.getLogger(__name__)
class RoomMemberWorkerHandler(RoomMemberHandler):
+ def __init__(self, hs):
+ super(RoomMemberWorkerHandler, self).__init__(hs)
+
+ self._get_register_3pid_client = Repl3PID.make_client(hs)
+ self._remote_join_client = ReplRemoteJoin.make_client(hs)
+ self._remote_reject_client = ReplRejectInvite.make_client(hs)
+ self._notify_change_client = ReplJoinedLeft.make_client(hs)
+
@defer.inlineCallbacks
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
"""Implements RoomMemberHandler._remote_join
@@ -37,10 +45,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers")
- ret = yield remote_join(
- self.simple_http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ ret = yield self._remote_join_client(
requester=requester,
remote_room_hosts=remote_room_hosts,
room_id=room_id,
@@ -55,10 +60,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
"""Implements RoomMemberHandler._remote_reject_invite
"""
- return remote_reject_invite(
- self.simple_http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ return self._remote_reject_client(
requester=requester,
remote_room_hosts=remote_room_hosts,
room_id=room_id,
@@ -68,10 +70,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def _user_joined_room(self, target, room_id):
"""Implements RoomMemberHandler._user_joined_room
"""
- return notify_user_membership_change(
- self.simple_http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ return self._notify_change_client(
user_id=target.to_string(),
room_id=room_id,
change="joined",
@@ -80,10 +79,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def _user_left_room(self, target, room_id):
"""Implements RoomMemberHandler._user_left_room
"""
- return notify_user_membership_change(
- self.simple_http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ return self._notify_change_client(
user_id=target.to_string(),
room_id=room_id,
change="left",
@@ -92,10 +88,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
"""Implements RoomMemberHandler.get_or_register_3pid_guest
"""
- return get_or_register_3pid_guest(
- self.simple_http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ return self._get_register_3pid_client(
requester=requester,
medium=medium,
address=address,
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
new file mode 100644
index 0000000000..5e5376cf58
--- /dev/null
+++ b/synapse/replication/http/_base.py
@@ -0,0 +1,215 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import abc
+import logging
+import re
+
+from six.moves import urllib
+
+from twisted.internet import defer
+
+from synapse.api.errors import CodeMessageException, HttpResponseException
+from synapse.util.caches.response_cache import ResponseCache
+from synapse.util.stringutils import random_string
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationEndpoint(object):
+ """Helper base class for defining new replication HTTP endpoints.
+
+ This creates an endpoint under `/_synapse/replication/:NAME/:PATH_ARGS..`
+ (with an `/:txn_id` prefix for cached requests.), where NAME is a name,
+ PATH_ARGS are a tuple of parameters to be encoded in the URL.
+
+ For example, if `NAME` is "send_event" and `PATH_ARGS` is `("event_id",)`,
+ with `CACHE` set to true then this generates an endpoint:
+
+ /_synapse/replication/send_event/:event_id/:txn_id
+
+ For POST/PUT requests the payload is serialized to json and sent as the
+ body, while for GET requests the payload is added as query parameters. See
+ `_serialize_payload` for details.
+
+ Incoming requests are handled by overriding `_handle_request`. Servers
+ must call `register` to register the path with the HTTP server.
+
+ Requests can be sent by calling the client returned by `make_client`.
+
+ Attributes:
+ NAME (str): A name for the endpoint, added to the path as well as used
+ in logging and metrics.
+ PATH_ARGS (tuple[str]): A list of parameters to be added to the path.
+ Adding parameters to the path (rather than payload) can make it
+ easier to follow along in the log files.
+ METHOD (str): The method of the HTTP request, defaults to POST. Can be
+ one of POST, PUT or GET. If GET then the payload is sent as query
+ parameters rather than a JSON body.
+ CACHE (bool): Whether server should cache the result of the request/
+ If true then transparently adds a txn_id to all requests, and
+ `_handle_request` must return a Deferred.
+ RETRY_ON_TIMEOUT(bool): Whether or not to retry the request when a 504
+ is received.
+ """
+
+ __metaclass__ = abc.ABCMeta
+
+ NAME = abc.abstractproperty()
+ PATH_ARGS = abc.abstractproperty()
+
+ METHOD = "POST"
+ CACHE = True
+ RETRY_ON_TIMEOUT = True
+
+ def __init__(self, hs):
+ if self.CACHE:
+ self.response_cache = ResponseCache(
+ hs, "repl." + self.NAME,
+ timeout_ms=30 * 60 * 1000,
+ )
+
+ assert self.METHOD in ("PUT", "POST", "GET")
+
+ @abc.abstractmethod
+ def _serialize_payload(**kwargs):
+ """Static method that is called when creating a request.
+
+ Concrete implementations should have explicit parameters (rather than
+ kwargs) so that an appropriate exception is raised if the client is
+ called with unexpected parameters. All PATH_ARGS must appear in
+ argument list.
+
+ Returns:
+ Deferred[dict]|dict: If POST/PUT request then dictionary must be
+ JSON serialisable, otherwise must be appropriate for adding as
+ query args.
+ """
+ return {}
+
+ @abc.abstractmethod
+ def _handle_request(self, request, **kwargs):
+ """Handle incoming request.
+
+ This is called with the request object and PATH_ARGS.
+
+ Returns:
+ Deferred[dict]: A JSON serialisable dict to be used as response
+ body of request.
+ """
+ pass
+
+ @classmethod
+ def make_client(cls, hs):
+ """Create a client that makes requests.
+
+ Returns a callable that accepts the same parameters as `_serialize_payload`.
+ """
+ clock = hs.get_clock()
+ host = hs.config.worker_replication_host
+ port = hs.config.worker_replication_http_port
+
+ client = hs.get_simple_http_client()
+
+ @defer.inlineCallbacks
+ def send_request(**kwargs):
+ data = yield cls._serialize_payload(**kwargs)
+
+ url_args = [urllib.parse.quote(kwargs[name]) for name in cls.PATH_ARGS]
+
+ if cls.CACHE:
+ txn_id = random_string(10)
+ url_args.append(txn_id)
+
+ if cls.METHOD == "POST":
+ request_func = client.post_json_get_json
+ elif cls.METHOD == "PUT":
+ request_func = client.put_json
+ elif cls.METHOD == "GET":
+ request_func = client.get_json
+ else:
+ # We have already asserted in the constructor that a
+ # compatible was picked, but lets be paranoid.
+ raise Exception(
+ "Unknown METHOD on %s replication endpoint" % (cls.NAME,)
+ )
+
+ uri = "http://%s:%s/_synapse/replication/%s/%s" % (
+ host, port, cls.NAME, "/".join(url_args)
+ )
+
+ try:
+ # We keep retrying the same request for timeouts. This is so that we
+ # have a good idea that the request has either succeeded or failed on
+ # the master, and so whether we should clean up or not.
+ while True:
+ try:
+ result = yield request_func(uri, data)
+ break
+ except CodeMessageException as e:
+ if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
+ raise
+
+ logger.warn("%s request timed out", cls.NAME)
+
+ # If we timed out we probably don't need to worry about backing
+ # off too much, but lets just wait a little anyway.
+ yield clock.sleep(1)
+ except HttpResponseException as e:
+ # We convert to SynapseError as we know that it was a SynapseError
+ # on the master process that we should send to the client. (And
+ # importantly, not stack traces everywhere)
+ raise e.to_synapse_error()
+
+ defer.returnValue(result)
+
+ return send_request
+
+ def register(self, http_server):
+ """Called by the server to register this as a handler to the
+ appropriate path.
+ """
+
+ url_args = list(self.PATH_ARGS)
+ handler = self._handle_request
+ method = self.METHOD
+
+ if self.CACHE:
+ handler = self._cached_handler
+ url_args.append("txn_id")
+
+ args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args)
+ pattern = re.compile("^/_synapse/replication/%s/%s$" % (
+ self.NAME,
+ args
+ ))
+
+ http_server.register_paths(method, [pattern], handler)
+
+ def _cached_handler(self, request, txn_id, **kwargs):
+ """Called on new incoming requests when caching is enabled. Checks
+ if there is a cached response for the request and returns that,
+ otherwise calls `_handle_request` and caches its response.
+ """
+ # We just use the txn_id here, but we probably also want to use the
+ # other PATH_ARGS as well.
+
+ assert self.CACHE
+
+ return self.response_cache.wrap(
+ txn_id,
+ self._handle_request,
+ request, **kwargs
+ )
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index 7a3cfb159c..e58bebf12a 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -14,182 +14,63 @@
# limitations under the License.
import logging
-import re
from twisted.internet import defer
-from synapse.api.errors import HttpResponseException
-from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import Requester, UserID
from synapse.util.distributor import user_joined_room, user_left_room
logger = logging.getLogger(__name__)
-@defer.inlineCallbacks
-def remote_join(client, host, port, requester, remote_room_hosts,
- room_id, user_id, content):
- """Ask the master to do a remote join for the given user to the given room
+class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
+ """Does a remote join for the given user to the given room
- Args:
- client (SimpleHttpClient)
- host (str): host of master
- port (int): port on master listening for HTTP replication
- requester (Requester)
- remote_room_hosts (list[str]): Servers to try and join via
- room_id (str)
- user_id (str)
- content (dict): The event content to use for the join event
+ Request format:
- Returns:
- Deferred
- """
- uri = "http://%s:%s/_synapse/replication/remote_join" % (host, port)
-
- payload = {
- "requester": requester.serialize(),
- "remote_room_hosts": remote_room_hosts,
- "room_id": room_id,
- "user_id": user_id,
- "content": content,
- }
-
- try:
- result = yield client.post_json_get_json(uri, payload)
- except HttpResponseException as e:
- # We convert to SynapseError as we know that it was a SynapseError
- # on the master process that we should send to the client. (And
- # importantly, not stack traces everywhere)
- raise e.to_synapse_error()
- defer.returnValue(result)
-
-
-@defer.inlineCallbacks
-def remote_reject_invite(client, host, port, requester, remote_room_hosts,
- room_id, user_id):
- """Ask master to reject the invite for the user and room.
-
- Args:
- client (SimpleHttpClient)
- host (str): host of master
- port (int): port on master listening for HTTP replication
- requester (Requester)
- remote_room_hosts (list[str]): Servers to try and reject via
- room_id (str)
- user_id (str)
-
- Returns:
- Deferred
- """
- uri = "http://%s:%s/_synapse/replication/remote_reject_invite" % (host, port)
-
- payload = {
- "requester": requester.serialize(),
- "remote_room_hosts": remote_room_hosts,
- "room_id": room_id,
- "user_id": user_id,
- }
-
- try:
- result = yield client.post_json_get_json(uri, payload)
- except HttpResponseException as e:
- # We convert to SynapseError as we know that it was a SynapseError
- # on the master process that we should send to the client. (And
- # importantly, not stack traces everywhere)
- raise e.to_synapse_error()
- defer.returnValue(result)
-
-
-@defer.inlineCallbacks
-def get_or_register_3pid_guest(client, host, port, requester,
- medium, address, inviter_user_id):
- """Ask the master to get/create a guest account for given 3PID.
-
- Args:
- client (SimpleHttpClient)
- host (str): host of master
- port (int): port on master listening for HTTP replication
- requester (Requester)
- medium (str)
- address (str)
- inviter_user_id (str): The user ID who is trying to invite the
- 3PID
-
- Returns:
- Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
- 3PID guest account.
- """
+ POST /_synapse/replication/remote_join/:room_id/:user_id
- uri = "http://%s:%s/_synapse/replication/get_or_register_3pid_guest" % (host, port)
-
- payload = {
- "requester": requester.serialize(),
- "medium": medium,
- "address": address,
- "inviter_user_id": inviter_user_id,
- }
-
- try:
- result = yield client.post_json_get_json(uri, payload)
- except HttpResponseException as e:
- # We convert to SynapseError as we know that it was a SynapseError
- # on the master process that we should send to the client. (And
- # importantly, not stack traces everywhere)
- raise e.to_synapse_error()
- defer.returnValue(result)
-
-
-@defer.inlineCallbacks
-def notify_user_membership_change(client, host, port, user_id, room_id, change):
- """Notify master that a user has joined or left the room
-
- Args:
- client (SimpleHttpClient)
- host (str): host of master
- port (int): port on master listening for HTTP replication.
- user_id (str)
- room_id (str)
- change (str): Either "join" or "left"
-
- Returns:
- Deferred
+ {
+ "requester": ...,
+ "remote_room_hosts": [...],
+ "content": { ... }
+ }
"""
- assert change in ("joined", "left")
-
- uri = "http://%s:%s/_synapse/replication/user_%s_room" % (host, port, change)
-
- payload = {
- "user_id": user_id,
- "room_id": room_id,
- }
-
- try:
- result = yield client.post_json_get_json(uri, payload)
- except HttpResponseException as e:
- # We convert to SynapseError as we know that it was a SynapseError
- # on the master process that we should send to the client. (And
- # importantly, not stack traces everywhere)
- raise e.to_synapse_error()
- defer.returnValue(result)
-
-class ReplicationRemoteJoinRestServlet(RestServlet):
- PATTERNS = [re.compile("^/_synapse/replication/remote_join$")]
+ NAME = "remote_join"
+ PATH_ARGS = ("room_id", "user_id",)
def __init__(self, hs):
- super(ReplicationRemoteJoinRestServlet, self).__init__()
+ super(ReplicationRemoteJoinRestServlet, self).__init__(hs)
self.federation_handler = hs.get_handlers().federation_handler
self.store = hs.get_datastore()
self.clock = hs.get_clock()
+ @staticmethod
+ def _serialize_payload(requester, room_id, user_id, remote_room_hosts,
+ content):
+ """
+ Args:
+ requester(Requester)
+ room_id (str)
+ user_id (str)
+ remote_room_hosts (list[str]): Servers to try and join via
+ content(dict): The event content to use for the join event
+ """
+ return {
+ "requester": requester.serialize(),
+ "remote_room_hosts": remote_room_hosts,
+ "content": content,
+ }
+
@defer.inlineCallbacks
- def on_POST(self, request):
+ def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request)
remote_room_hosts = content["remote_room_hosts"]
- room_id = content["room_id"]
- user_id = content["user_id"]
event_content = content["content"]
requester = Requester.deserialize(self.store, content["requester"])
@@ -212,23 +93,48 @@ class ReplicationRemoteJoinRestServlet(RestServlet):
defer.returnValue((200, {}))
-class ReplicationRemoteRejectInviteRestServlet(RestServlet):
- PATTERNS = [re.compile("^/_synapse/replication/remote_reject_invite$")]
+class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
+ """Rejects the invite for the user and room.
+
+ Request format:
+
+ POST /_synapse/replication/remote_reject_invite/:room_id/:user_id
+
+ {
+ "requester": ...,
+ "remote_room_hosts": [...],
+ }
+ """
+
+ NAME = "remote_reject_invite"
+ PATH_ARGS = ("room_id", "user_id",)
def __init__(self, hs):
- super(ReplicationRemoteRejectInviteRestServlet, self).__init__()
+ super(ReplicationRemoteRejectInviteRestServlet, self).__init__(hs)
self.federation_handler = hs.get_handlers().federation_handler
self.store = hs.get_datastore()
self.clock = hs.get_clock()
+ @staticmethod
+ def _serialize_payload(requester, room_id, user_id, remote_room_hosts):
+ """
+ Args:
+ requester(Requester)
+ room_id (str)
+ user_id (str)
+ remote_room_hosts (list[str]): Servers to try and reject via
+ """
+ return {
+ "requester": requester.serialize(),
+ "remote_room_hosts": remote_room_hosts,
+ }
+
@defer.inlineCallbacks
- def on_POST(self, request):
+ def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request)
remote_room_hosts = content["remote_room_hosts"]
- room_id = content["room_id"]
- user_id = content["user_id"]
requester = Requester.deserialize(self.store, content["requester"])
@@ -264,18 +170,50 @@ class ReplicationRemoteRejectInviteRestServlet(RestServlet):
defer.returnValue((200, ret))
-class ReplicationRegister3PIDGuestRestServlet(RestServlet):
- PATTERNS = [re.compile("^/_synapse/replication/get_or_register_3pid_guest$")]
+class ReplicationRegister3PIDGuestRestServlet(ReplicationEndpoint):
+ """Gets/creates a guest account for given 3PID.
+
+ Request format:
+
+ POST /_synapse/replication/get_or_register_3pid_guest/
+
+ {
+ "requester": ...,
+ "medium": ...,
+ "address": ...,
+ "inviter_user_id": ...
+ }
+ """
+
+ NAME = "get_or_register_3pid_guest"
+ PATH_ARGS = ()
def __init__(self, hs):
- super(ReplicationRegister3PIDGuestRestServlet, self).__init__()
+ super(ReplicationRegister3PIDGuestRestServlet, self).__init__(hs)
self.registeration_handler = hs.get_handlers().registration_handler
self.store = hs.get_datastore()
self.clock = hs.get_clock()
+ @staticmethod
+ def _serialize_payload(requester, medium, address, inviter_user_id):
+ """
+ Args:
+ requester(Requester)
+ medium (str)
+ address (str)
+ inviter_user_id (str): The user ID who is trying to invite the
+ 3PID
+ """
+ return {
+ "requester": requester.serialize(),
+ "medium": medium,
+ "address": address,
+ "inviter_user_id": inviter_user_id,
+ }
+
@defer.inlineCallbacks
- def on_POST(self, request):
+ def _handle_request(self, request):
content = parse_json_object_from_request(request)
medium = content["medium"]
@@ -296,23 +234,41 @@ class ReplicationRegister3PIDGuestRestServlet(RestServlet):
defer.returnValue((200, ret))
-class ReplicationUserJoinedLeftRoomRestServlet(RestServlet):
- PATTERNS = [re.compile("^/_synapse/replication/user_(?P<change>joined|left)_room$")]
+class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
+ """Notifies that a user has joined or left the room
+
+ Request format:
+
+ POST /_synapse/replication/membership_change/:room_id/:user_id/:change
+
+ {}
+ """
+
+ NAME = "membership_change"
+ PATH_ARGS = ("room_id", "user_id", "change")
+ CACHE = False # No point caching as should return instantly.
def __init__(self, hs):
- super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__()
+ super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__(hs)
self.registeration_handler = hs.get_handlers().registration_handler
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.distributor = hs.get_distributor()
- def on_POST(self, request, change):
- content = parse_json_object_from_request(request)
+ @staticmethod
+ def _serialize_payload(room_id, user_id, change):
+ """
+ Args:
+ room_id (str)
+ user_id (str)
+ change (str): Either "joined" or "left"
+ """
+ assert change in ("joined", "left",)
- user_id = content["user_id"]
- room_id = content["room_id"]
+ return {}
+ def _handle_request(self, request, room_id, user_id, change):
logger.info("user membership change: %s in %s", user_id, room_id)
user = UserID.from_string(user_id)
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index d3509dc288..5b52c91650 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -14,86 +14,26 @@
# limitations under the License.
import logging
-import re
from twisted.internet import defer
-from synapse.api.errors import CodeMessageException, HttpResponseException
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
-from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import Requester, UserID
-from synapse.util.caches.response_cache import ResponseCache
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
-@defer.inlineCallbacks
-def send_event_to_master(clock, store, client, host, port, requester, event, context,
- ratelimit, extra_users):
- """Send event to be handled on the master
-
- Args:
- clock (synapse.util.Clock)
- store (DataStore)
- client (SimpleHttpClient)
- host (str): host of master
- port (int): port on master listening for HTTP replication
- requester (Requester)
- event (FrozenEvent)
- context (EventContext)
- ratelimit (bool)
- extra_users (list(UserID)): Any extra users to notify about event
- """
- uri = "http://%s:%s/_synapse/replication/send_event/%s" % (
- host, port, event.event_id,
- )
-
- serialized_context = yield context.serialize(event, store)
-
- payload = {
- "event": event.get_pdu_json(),
- "internal_metadata": event.internal_metadata.get_dict(),
- "rejected_reason": event.rejected_reason,
- "context": serialized_context,
- "requester": requester.serialize(),
- "ratelimit": ratelimit,
- "extra_users": [u.to_string() for u in extra_users],
- }
-
- try:
- # We keep retrying the same request for timeouts. This is so that we
- # have a good idea that the request has either succeeded or failed on
- # the master, and so whether we should clean up or not.
- while True:
- try:
- result = yield client.put_json(uri, payload)
- break
- except CodeMessageException as e:
- if e.code != 504:
- raise
-
- logger.warn("send_event request timed out")
-
- # If we timed out we probably don't need to worry about backing
- # off too much, but lets just wait a little anyway.
- yield clock.sleep(1)
- except HttpResponseException as e:
- # We convert to SynapseError as we know that it was a SynapseError
- # on the master process that we should send to the client. (And
- # importantly, not stack traces everywhere)
- raise e.to_synapse_error()
- defer.returnValue(result)
-
-
-class ReplicationSendEventRestServlet(RestServlet):
+class ReplicationSendEventRestServlet(ReplicationEndpoint):
"""Handles events newly created on workers, including persisting and
notifying.
The API looks like:
- POST /_synapse/replication/send_event/:event_id
+ POST /_synapse/replication/send_event/:event_id/:txn_id
{
"event": { .. serialized event .. },
@@ -105,27 +45,47 @@ class ReplicationSendEventRestServlet(RestServlet):
"extra_users": [],
}
"""
- PATTERNS = [re.compile("^/_synapse/replication/send_event/(?P<event_id>[^/]+)$")]
+ NAME = "send_event"
+ PATH_ARGS = ("event_id",)
def __init__(self, hs):
- super(ReplicationSendEventRestServlet, self).__init__()
+ super(ReplicationSendEventRestServlet, self).__init__(hs)
self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
- # The responses are tiny, so we may as well cache them for a while
- self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
+ @staticmethod
+ @defer.inlineCallbacks
+ def _serialize_payload(event_id, store, event, context, requester,
+ ratelimit, extra_users):
+ """
+ Args:
+ event_id (str)
+ store (DataStore)
+ requester (Requester)
+ event (FrozenEvent)
+ context (EventContext)
+ ratelimit (bool)
+ extra_users (list(UserID)): Any extra users to notify about event
+ """
+
+ serialized_context = yield context.serialize(event, store)
+
+ payload = {
+ "event": event.get_pdu_json(),
+ "internal_metadata": event.internal_metadata.get_dict(),
+ "rejected_reason": event.rejected_reason,
+ "context": serialized_context,
+ "requester": requester.serialize(),
+ "ratelimit": ratelimit,
+ "extra_users": [u.to_string() for u in extra_users],
+ }
- def on_PUT(self, request, event_id):
- return self.response_cache.wrap(
- event_id,
- self._handle_request,
- request
- )
+ defer.returnValue(payload)
@defer.inlineCallbacks
- def _handle_request(self, request):
+ def _handle_request(self, request, event_id):
with Measure(self.clock, "repl_send_event_parse"):
content = parse_json_object_from_request(request)
|