summary refs log tree commit diff
path: root/synapse/replication/http
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/http')
-rw-r--r--synapse/replication/http/__init__.py29
-rw-r--r--synapse/replication/http/membership.py334
-rw-r--r--synapse/replication/http/send_event.py165
3 files changed, 528 insertions, 0 deletions
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
new file mode 100644
index 0000000000..589ee94c66
--- /dev/null
+++ b/synapse/replication/http/__init__.py
@@ -0,0 +1,29 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.http.server import JsonResource
+from synapse.replication.http import membership, send_event
+
+REPLICATION_PREFIX = "/_synapse/replication"
+
+
+class ReplicationRestResource(JsonResource):
+    def __init__(self, hs):
+        JsonResource.__init__(self, hs, canonical_json=False)
+        self.register_servlets(hs)
+
+    def register_servlets(self, hs):
+        send_event.register_servlets(hs, self)
+        membership.register_servlets(hs, self)
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
new file mode 100644
index 0000000000..6bfc8a5b89
--- /dev/null
+++ b/synapse/replication/http/membership.py
@@ -0,0 +1,334 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import re
+
+from twisted.internet import defer
+
+from synapse.api.errors import MatrixCodeMessageException, SynapseError
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.types import Requester, UserID
+from synapse.util.distributor import user_joined_room, user_left_room
+
+logger = logging.getLogger(__name__)
+
+
+@defer.inlineCallbacks
+def remote_join(client, host, port, requester, remote_room_hosts,
+                room_id, user_id, content):
+    """Ask the master to do a remote join for the given user to the given room
+
+    Args:
+        client (SimpleHttpClient)
+        host (str): host of master
+        port (int): port on master listening for HTTP replication
+        requester (Requester)
+        remote_room_hosts (list[str]): Servers to try and join via
+        room_id (str)
+        user_id (str)
+        content (dict): The event content to use for the join event
+
+    Returns:
+        Deferred
+    """
+    uri = "http://%s:%s/_synapse/replication/remote_join" % (host, port)
+
+    payload = {
+        "requester": requester.serialize(),
+        "remote_room_hosts": remote_room_hosts,
+        "room_id": room_id,
+        "user_id": user_id,
+        "content": content,
+    }
+
+    try:
+        result = yield client.post_json_get_json(uri, payload)
+    except MatrixCodeMessageException as e:
+        # We convert to SynapseError as we know that it was a SynapseError
+        # on the master process that we should send to the client. (And
+        # importantly, not stack traces everywhere)
+        raise SynapseError(e.code, e.msg, e.errcode)
+    defer.returnValue(result)
+
+
+@defer.inlineCallbacks
+def remote_reject_invite(client, host, port, requester, remote_room_hosts,
+                         room_id, user_id):
+    """Ask master to reject the invite for the user and room.
+
+    Args:
+        client (SimpleHttpClient)
+        host (str): host of master
+        port (int): port on master listening for HTTP replication
+        requester (Requester)
+        remote_room_hosts (list[str]): Servers to try and reject via
+        room_id (str)
+        user_id (str)
+
+    Returns:
+        Deferred
+    """
+    uri = "http://%s:%s/_synapse/replication/remote_reject_invite" % (host, port)
+
+    payload = {
+        "requester": requester.serialize(),
+        "remote_room_hosts": remote_room_hosts,
+        "room_id": room_id,
+        "user_id": user_id,
+    }
+
+    try:
+        result = yield client.post_json_get_json(uri, payload)
+    except MatrixCodeMessageException as e:
+        # We convert to SynapseError as we know that it was a SynapseError
+        # on the master process that we should send to the client. (And
+        # importantly, not stack traces everywhere)
+        raise SynapseError(e.code, e.msg, e.errcode)
+    defer.returnValue(result)
+
+
+@defer.inlineCallbacks
+def get_or_register_3pid_guest(client, host, port, requester,
+                               medium, address, inviter_user_id):
+    """Ask the master to get/create a guest account for given 3PID.
+
+    Args:
+        client (SimpleHttpClient)
+        host (str): host of master
+        port (int): port on master listening for HTTP replication
+        requester (Requester)
+        medium (str)
+        address (str)
+        inviter_user_id (str): The user ID who is trying to invite the
+            3PID
+
+    Returns:
+        Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
+        3PID guest account.
+    """
+
+    uri = "http://%s:%s/_synapse/replication/get_or_register_3pid_guest" % (host, port)
+
+    payload = {
+        "requester": requester.serialize(),
+        "medium": medium,
+        "address": address,
+        "inviter_user_id": inviter_user_id,
+    }
+
+    try:
+        result = yield client.post_json_get_json(uri, payload)
+    except MatrixCodeMessageException as e:
+        # We convert to SynapseError as we know that it was a SynapseError
+        # on the master process that we should send to the client. (And
+        # importantly, not stack traces everywhere)
+        raise SynapseError(e.code, e.msg, e.errcode)
+    defer.returnValue(result)
+
+
+@defer.inlineCallbacks
+def notify_user_membership_change(client, host, port, user_id, room_id, change):
+    """Notify master that a user has joined or left the room
+
+    Args:
+        client (SimpleHttpClient)
+        host (str): host of master
+        port (int): port on master listening for HTTP replication.
+        user_id (str)
+        room_id (str)
+        change (str): Either "join" or "left"
+
+    Returns:
+        Deferred
+    """
+    assert change in ("joined", "left")
+
+    uri = "http://%s:%s/_synapse/replication/user_%s_room" % (host, port, change)
+
+    payload = {
+        "user_id": user_id,
+        "room_id": room_id,
+    }
+
+    try:
+        result = yield client.post_json_get_json(uri, payload)
+    except MatrixCodeMessageException as e:
+        # We convert to SynapseError as we know that it was a SynapseError
+        # on the master process that we should send to the client. (And
+        # importantly, not stack traces everywhere)
+        raise SynapseError(e.code, e.msg, e.errcode)
+    defer.returnValue(result)
+
+
+class ReplicationRemoteJoinRestServlet(RestServlet):
+    PATTERNS = [re.compile("^/_synapse/replication/remote_join$")]
+
+    def __init__(self, hs):
+        super(ReplicationRemoteJoinRestServlet, self).__init__()
+
+        self.federation_handler = hs.get_handlers().federation_handler
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        content = parse_json_object_from_request(request)
+
+        remote_room_hosts = content["remote_room_hosts"]
+        room_id = content["room_id"]
+        user_id = content["user_id"]
+        event_content = content["content"]
+
+        requester = Requester.deserialize(self.store, content["requester"])
+
+        if requester.user:
+            request.authenticated_entity = requester.user.to_string()
+
+        logger.info(
+            "remote_join: %s into room: %s",
+            user_id, room_id,
+        )
+
+        yield self.federation_handler.do_invite_join(
+            remote_room_hosts,
+            room_id,
+            user_id,
+            event_content,
+        )
+
+        defer.returnValue((200, {}))
+
+
+class ReplicationRemoteRejectInviteRestServlet(RestServlet):
+    PATTERNS = [re.compile("^/_synapse/replication/remote_reject_invite$")]
+
+    def __init__(self, hs):
+        super(ReplicationRemoteRejectInviteRestServlet, self).__init__()
+
+        self.federation_handler = hs.get_handlers().federation_handler
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        content = parse_json_object_from_request(request)
+
+        remote_room_hosts = content["remote_room_hosts"]
+        room_id = content["room_id"]
+        user_id = content["user_id"]
+
+        requester = Requester.deserialize(self.store, content["requester"])
+
+        if requester.user:
+            request.authenticated_entity = requester.user.to_string()
+
+        logger.info(
+            "remote_reject_invite: %s out of room: %s",
+            user_id, room_id,
+        )
+
+        try:
+            event = yield self.federation_handler.do_remotely_reject_invite(
+                remote_room_hosts,
+                room_id,
+                user_id,
+            )
+            ret = event.get_pdu_json()
+        except Exception as e:
+            # if we were unable to reject the exception, just mark
+            # it as rejected on our end and plough ahead.
+            #
+            # The 'except' clause is very broad, but we need to
+            # capture everything from DNS failures upwards
+            #
+            logger.warn("Failed to reject invite: %s", e)
+
+            yield self.store.locally_reject_invite(
+                user_id, room_id
+            )
+            ret = {}
+
+        defer.returnValue((200, ret))
+
+
+class ReplicationRegister3PIDGuestRestServlet(RestServlet):
+    PATTERNS = [re.compile("^/_synapse/replication/get_or_register_3pid_guest$")]
+
+    def __init__(self, hs):
+        super(ReplicationRegister3PIDGuestRestServlet, self).__init__()
+
+        self.registeration_handler = hs.get_handlers().registration_handler
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        content = parse_json_object_from_request(request)
+
+        medium = content["medium"]
+        address = content["address"]
+        inviter_user_id = content["inviter_user_id"]
+
+        requester = Requester.deserialize(self.store, content["requester"])
+
+        if requester.user:
+            request.authenticated_entity = requester.user.to_string()
+
+        logger.info("get_or_register_3pid_guest: %r", content)
+
+        ret = yield self.registeration_handler.get_or_register_3pid_guest(
+            medium, address, inviter_user_id,
+        )
+
+        defer.returnValue((200, ret))
+
+
+class ReplicationUserJoinedLeftRoomRestServlet(RestServlet):
+    PATTERNS = [re.compile("^/_synapse/replication/user_(?P<change>joined|left)_room$")]
+
+    def __init__(self, hs):
+        super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__()
+
+        self.registeration_handler = hs.get_handlers().registration_handler
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+        self.distributor = hs.get_distributor()
+
+    def on_POST(self, request, change):
+        content = parse_json_object_from_request(request)
+
+        user_id = content["user_id"]
+        room_id = content["room_id"]
+
+        logger.info("user membership change: %s in %s", user_id, room_id)
+
+        user = UserID.from_string(user_id)
+
+        if change == "joined":
+            user_joined_room(self.distributor, user, room_id)
+        elif change == "left":
+            user_left_room(self.distributor, user, room_id)
+        else:
+            raise Exception("Unrecognized change: %r", change)
+
+        return (200, {})
+
+
+def register_servlets(hs, http_server):
+    ReplicationRemoteJoinRestServlet(hs).register(http_server)
+    ReplicationRemoteRejectInviteRestServlet(hs).register(http_server)
+    ReplicationRegister3PIDGuestRestServlet(hs).register(http_server)
+    ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
new file mode 100644
index 0000000000..5227bc333d
--- /dev/null
+++ b/synapse/replication/http/send_event.py
@@ -0,0 +1,165 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import re
+
+from twisted.internet import defer
+
+from synapse.api.errors import (
+    CodeMessageException,
+    MatrixCodeMessageException,
+    SynapseError,
+)
+from synapse.events import FrozenEvent
+from synapse.events.snapshot import EventContext
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.types import Requester, UserID
+from synapse.util.caches.response_cache import ResponseCache
+from synapse.util.metrics import Measure
+
+logger = logging.getLogger(__name__)
+
+
+@defer.inlineCallbacks
+def send_event_to_master(clock, store, client, host, port, requester, event, context,
+                         ratelimit, extra_users):
+    """Send event to be handled on the master
+
+    Args:
+        clock (synapse.util.Clock)
+        store (DataStore)
+        client (SimpleHttpClient)
+        host (str): host of master
+        port (int): port on master listening for HTTP replication
+        requester (Requester)
+        event (FrozenEvent)
+        context (EventContext)
+        ratelimit (bool)
+        extra_users (list(UserID)): Any extra users to notify about event
+    """
+    uri = "http://%s:%s/_synapse/replication/send_event/%s" % (
+        host, port, event.event_id,
+    )
+
+    serialized_context = yield context.serialize(event, store)
+
+    payload = {
+        "event": event.get_pdu_json(),
+        "internal_metadata": event.internal_metadata.get_dict(),
+        "rejected_reason": event.rejected_reason,
+        "context": serialized_context,
+        "requester": requester.serialize(),
+        "ratelimit": ratelimit,
+        "extra_users": [u.to_string() for u in extra_users],
+    }
+
+    try:
+        # We keep retrying the same request for timeouts. This is so that we
+        # have a good idea that the request has either succeeded or failed on
+        # the master, and so whether we should clean up or not.
+        while True:
+            try:
+                result = yield client.put_json(uri, payload)
+                break
+            except CodeMessageException as e:
+                if e.code != 504:
+                    raise
+
+            logger.warn("send_event request timed out")
+
+            # If we timed out we probably don't need to worry about backing
+            # off too much, but lets just wait a little anyway.
+            yield clock.sleep(1)
+    except MatrixCodeMessageException as e:
+        # We convert to SynapseError as we know that it was a SynapseError
+        # on the master process that we should send to the client. (And
+        # importantly, not stack traces everywhere)
+        raise SynapseError(e.code, e.msg, e.errcode)
+    defer.returnValue(result)
+
+
+class ReplicationSendEventRestServlet(RestServlet):
+    """Handles events newly created on workers, including persisting and
+    notifying.
+
+    The API looks like:
+
+        POST /_synapse/replication/send_event/:event_id
+
+        {
+            "event": { .. serialized event .. },
+            "internal_metadata": { .. serialized internal_metadata .. },
+            "rejected_reason": ..,   // The event.rejected_reason field
+            "context": { .. serialized event context .. },
+            "requester": { .. serialized requester .. },
+            "ratelimit": true,
+            "extra_users": [],
+        }
+    """
+    PATTERNS = [re.compile("^/_synapse/replication/send_event/(?P<event_id>[^/]+)$")]
+
+    def __init__(self, hs):
+        super(ReplicationSendEventRestServlet, self).__init__()
+
+        self.event_creation_handler = hs.get_event_creation_handler()
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+        # The responses are tiny, so we may as well cache them for a while
+        self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
+
+    def on_PUT(self, request, event_id):
+        return self.response_cache.wrap(
+            event_id,
+            self._handle_request,
+            request
+        )
+
+    @defer.inlineCallbacks
+    def _handle_request(self, request):
+        with Measure(self.clock, "repl_send_event_parse"):
+            content = parse_json_object_from_request(request)
+
+            event_dict = content["event"]
+            internal_metadata = content["internal_metadata"]
+            rejected_reason = content["rejected_reason"]
+            event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
+
+            requester = Requester.deserialize(self.store, content["requester"])
+            context = yield EventContext.deserialize(self.store, content["context"])
+
+            ratelimit = content["ratelimit"]
+            extra_users = [UserID.from_string(u) for u in content["extra_users"]]
+
+        if requester.user:
+            request.authenticated_entity = requester.user.to_string()
+
+        logger.info(
+            "Got event to send with ID: %s into room: %s",
+            event.event_id, event.room_id,
+        )
+
+        yield self.event_creation_handler.persist_and_notify_client_event(
+            requester, event, context,
+            ratelimit=ratelimit,
+            extra_users=extra_users,
+        )
+
+        defer.returnValue((200, {}))
+
+
+def register_servlets(hs, http_server):
+    ReplicationSendEventRestServlet(hs).register(http_server)