summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rwxr-xr-xsynapse/app/homeserver.py4
-rw-r--r--synapse/config/workers.py8
-rw-r--r--synapse/events/snapshot.py72
-rw-r--r--synapse/handlers/message.py16
-rw-r--r--synapse/replication/http/__init__.py31
-rw-r--r--synapse/replication/http/send_event.py108
-rw-r--r--synapse/storage/appservice.py13
-rw-r--r--synapse/types.py65
8 files changed, 304 insertions, 13 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index cb82a415a6..e375f2bbcf 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -38,6 +38,7 @@ from synapse.metrics import register_memory_metrics
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
     check_requirements
+from synapse.replication.http import ReplicationRestResource, REPLICATION_PREFIX
 from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
 from synapse.rest import ClientRestResource
 from synapse.rest.key.v1.server_key_resource import LocalKey
@@ -219,6 +220,9 @@ class SynapseHomeServer(HomeServer):
         if name == "metrics" and self.get_config().enable_metrics:
             resources[METRICS_PREFIX] = MetricsResource(self)
 
+        if name == "replication":
+            resources[REPLICATION_PREFIX] = ReplicationRestResource(self)
+
         return resources
 
     def start_listening(self):
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 4b6884918d..80baf0ce0e 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -33,8 +33,16 @@ class WorkerConfig(Config):
         self.worker_pid_file = config.get("worker_pid_file")
         self.worker_log_file = config.get("worker_log_file")
         self.worker_log_config = config.get("worker_log_config")
+
+        # The host used to connect to the main synapse
         self.worker_replication_host = config.get("worker_replication_host", None)
+
+        # The port on the main synapse for TCP replication
         self.worker_replication_port = config.get("worker_replication_port", None)
+
+        # The port on the main synapse for HTTP replication endpoint
+        self.worker_replication_http_port = config.get("worker_replication_http_port")
+
         self.worker_name = config.get("worker_name", self.worker_app)
 
         self.worker_main_http_uri = config.get("worker_main_http_uri", None)
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index 87e3fe7b97..7b80444f73 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -14,6 +14,9 @@
 # limitations under the License.
 
 
+from frozendict import frozendict
+
+
 class EventContext(object):
     """
     Attributes:
@@ -73,3 +76,72 @@ class EventContext(object):
         self.prev_state_events = None
 
         self.app_service = None
+
+    def serialize(self):
+        """Converts self to a type that can be serialized as JSON, and then
+        deserialized by `deserialize`
+
+        Returns:
+            dict
+        """
+        return {
+            "current_state_ids": _encode_state_dict(self.current_state_ids),
+            "prev_state_ids": _encode_state_dict(self.prev_state_ids),
+            "state_group": self.state_group,
+            "rejected": self.rejected,
+            "push_actions": self.push_actions,
+            "prev_group": self.prev_group,
+            "delta_ids": _encode_state_dict(self.delta_ids),
+            "prev_state_events": self.prev_state_events,
+            "app_service_id": self.app_service.id if self.app_service else None
+        }
+
+    @staticmethod
+    def deserialize(store, input):
+        """Converts a dict that was produced by `serialize` back into a
+        EventContext.
+
+        Args:
+            store (DataStore): Used to convert AS ID to AS object
+            input (dict): A dict produced by `serialize`
+
+        Returns:
+            EventContext
+        """
+        context = EventContext()
+        context.current_state_ids = _decode_state_dict(input["current_state_ids"])
+        context.prev_state_ids = _decode_state_dict(input["prev_state_ids"])
+        context.state_group = input["state_group"]
+        context.rejected = input["rejected"]
+        context.push_actions = input["push_actions"]
+        context.prev_group = input["prev_group"]
+        context.delta_ids = _decode_state_dict(input["delta_ids"])
+        context.prev_state_events = input["prev_state_events"]
+
+        app_service_id = input["app_service_id"]
+        if app_service_id:
+            context.app_service = store.get_app_service_by_id(app_service_id)
+
+        return context
+
+
+def _encode_state_dict(state_dict):
+    """Since dicts of (type, state_key) -> event_id cannot be serialized in
+    JSON we need to convert them to a form that can.
+    """
+    if state_dict is None:
+        return None
+
+    return [
+        (etype, state_key, v)
+        for (etype, state_key), v in state_dict.iteritems()
+    ]
+
+
+def _decode_state_dict(input):
+    """Decodes a state dict encoded using `_encode_state_dict` above
+    """
+    if input is None:
+        return None
+
+    return frozendict({(etype, state_key,): v for etype, state_key, v in input})
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a58fc37fff..92c153f300 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -28,6 +28,7 @@ from synapse.util.logcontext import preserve_fn
 from synapse.util.metrics import measure_func
 from synapse.util.frozenutils import unfreeze
 from synapse.visibility import filter_events_for_client
+from synapse.replication.http.send_event import send_event_to_master
 
 from ._base import BaseHandler
 
@@ -312,6 +313,9 @@ class EventCreationHandler(object):
         self.server_name = hs.hostname
         self.ratelimiter = hs.get_ratelimiter()
         self.notifier = hs.get_notifier()
+        self.config = hs.config
+
+        self.http_client = hs.get_simple_http_client()
 
         # This is only used to get at ratelimit function, and maybe_kick_guest_users
         self.base_handler = BaseHandler(hs)
@@ -559,6 +563,18 @@ class EventCreationHandler(object):
     ):
         # We now need to go and hit out to wherever we need to hit out to.
 
+        # If we're a worker we need to hit out to the master.
+        if self.config.worker_app:
+            yield send_event_to_master(
+                self.http_client,
+                host=self.config.worker_replication_host,
+                port=self.config.worker_replication_http_port,
+                requester=requester,
+                event=event,
+                context=context,
+            )
+            return
+
         if ratelimit:
             yield self.base_handler.ratelimit(requester)
 
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
new file mode 100644
index 0000000000..b378b41646
--- /dev/null
+++ b/synapse/replication/http/__init__.py
@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import send_event
+
+from synapse.http.server import JsonResource
+
+
+REPLICATION_PREFIX = "/_synapse/replication"
+
+
+class ReplicationRestResource(JsonResource):
+    def __init__(self, hs):
+        JsonResource.__init__(self, hs, canonical_json=False)
+        self.register_servlets(hs)
+
+    def register_servlets(self, hs):
+        send_event.register_servlets(hs, self)
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
new file mode 100644
index 0000000000..ff9b9d2f10
--- /dev/null
+++ b/synapse/replication/http/send_event.py
@@ -0,0 +1,108 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.events import FrozenEvent
+from synapse.events.snapshot import EventContext
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.util.metrics import Measure
+from synapse.types import Requester
+
+import logging
+import re
+
+logger = logging.getLogger(__name__)
+
+
+def send_event_to_master(client, host, port, requester, event, context):
+    """Send event to be handled on the master
+
+    Args:
+        client (SimpleHttpClient)
+        host (str): host of master
+        port (int): port on master listening for HTTP replication
+        requester (Requester)
+        event (FrozenEvent)
+        context (EventContext)
+    """
+    uri = "http://%s:%s/_synapse/replication/send_event" % (host, port,)
+
+    payload = {
+        "event": event.get_pdu_json(),
+        "internal_metadata": event.internal_metadata.get_dict(),
+        "rejected_reason": event.rejected_reason,
+        "context": context.serialize(),
+        "requester": requester.serialize(),
+    }
+
+    return client.post_json_get_json(uri, payload)
+
+
+class ReplicationSendEventRestServlet(RestServlet):
+    """Handles events newly created on workers, including persisting and
+    notifying.
+
+    The API looks like:
+
+        POST /_synapse/replication/send_event
+
+        {
+            "event": { .. serialized event .. },
+            "internal_metadata": { .. serialized internal_metadata .. },
+            "rejected_reason": ..,   // The event.rejected_reason field
+            "context": { .. serialized event context .. },
+            "requester": { .. serialized requester .. },
+        }
+    """
+    PATTERNS = [re.compile("^/_synapse/replication/send_event$")]
+
+    def __init__(self, hs):
+        super(ReplicationSendEventRestServlet, self).__init__()
+
+        self.event_creation_handler = hs.get_event_creation_handler()
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        with Measure(self.clock, "repl_send_event_parse"):
+            content = parse_json_object_from_request(request)
+
+            event_dict = content["event"]
+            internal_metadata = content["internal_metadata"]
+            rejected_reason = content["rejected_reason"]
+            event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
+
+            requester = Requester.deserialize(self.store, content["requester"])
+            context = EventContext.deserialize(self.store, content["context"])
+
+        if requester.user:
+            request.authenticated_entity = requester.user.to_string()
+
+        logger.info(
+            "Got event to send with ID: %s into room: %s",
+            event.event_id, event.room_id,
+        )
+
+        yield self.event_creation_handler.handle_new_client_event(
+            requester, event, context,
+        )
+
+        defer.returnValue((200, {}))
+
+
+def register_servlets(hs, http_server):
+    ReplicationSendEventRestServlet(hs).register(http_server)
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index d8c84b7141..79673b4273 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -99,6 +99,19 @@ class ApplicationServiceStore(SQLBaseStore):
                 return service
         return None
 
+    def get_app_service_by_id(self, as_id):
+        """Get the application service with the given appservice ID.
+
+        Args:
+            as_id (str): The application service ID.
+        Returns:
+            synapse.appservice.ApplicationService or None.
+        """
+        for service in self.services_cache:
+            if service.id == as_id:
+                return service
+        return None
+
     def get_app_service_rooms(self, service):
         """Get a list of RoomsForUser for this application service.
 
diff --git a/synapse/types.py b/synapse/types.py
index 6e76c016d9..7cb24cecb2 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -19,20 +19,59 @@ from synapse.api.errors import SynapseError
 from collections import namedtuple
 
 
-Requester = namedtuple("Requester", [
+class Requester(namedtuple("Requester", [
     "user", "access_token_id", "is_guest", "device_id", "app_service",
-])
-"""
-Represents the user making a request
-
-Attributes:
-    user (UserID):  id of the user making the request
-    access_token_id (int|None):  *ID* of the access token used for this
-        request, or None if it came via the appservice API or similar
-    is_guest (bool):  True if the user making this request is a guest user
-    device_id (str|None):  device_id which was set at authentication time
-    app_service (ApplicationService|None):  the AS requesting on behalf of the user
-"""
+])):
+    """
+    Represents the user making a request
+
+    Attributes:
+        user (UserID):  id of the user making the request
+        access_token_id (int|None):  *ID* of the access token used for this
+            request, or None if it came via the appservice API or similar
+        is_guest (bool):  True if the user making this request is a guest user
+        device_id (str|None):  device_id which was set at authentication time
+        app_service (ApplicationService|None):  the AS requesting on behalf of the user
+    """
+
+    def serialize(self):
+        """Converts self to a type that can be serialized as JSON, and then
+        deserialized by `deserialize`
+
+        Returns:
+            dict
+        """
+        return {
+            "user_id": self.user.to_string(),
+            "access_token_id": self.access_token_id,
+            "is_guest": self.is_guest,
+            "device_id": self.device_id,
+            "app_server_id": self.app_service.id if self.app_service else None,
+        }
+
+    @staticmethod
+    def deserialize(store, input):
+        """Converts a dict that was produced by `serialize` back into a
+        Requester.
+
+        Args:
+            store (DataStore): Used to convert AS ID to AS object
+            input (dict): A dict produced by `serialize`
+
+        Returns:
+            Requester
+        """
+        appservice = None
+        if input["app_server_id"]:
+            appservice = store.get_app_service_by_id(input["app_server_id"])
+
+        return Requester(
+            user=UserID.from_string(input["user_id"]),
+            access_token_id=input["access_token_id"],
+            is_guest=input["is_guest"],
+            device_id=input["device_id"],
+            app_service=appservice,
+        )
 
 
 def create_requester(user_id, access_token_id=None, is_guest=False,