summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/__init__.py4
-rw-r--r--synapse/replication/http/_base.py3
-rw-r--r--synapse/replication/http/membership.py40
-rw-r--r--synapse/replication/http/presence.py116
-rw-r--r--synapse/replication/tcp/handler.py10
5 files changed, 171 insertions, 2 deletions
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index a909744e93..19b69e0e11 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -19,6 +19,7 @@ from synapse.replication.http import (
     federation,
     login,
     membership,
+    presence,
     register,
     send_event,
     streams,
@@ -35,10 +36,11 @@ class ReplicationRestResource(JsonResource):
     def register_servlets(self, hs):
         send_event.register_servlets(hs, self)
         federation.register_servlets(hs, self)
+        presence.register_servlets(hs, self)
+        membership.register_servlets(hs, self)
 
         # The following can't currently be instantiated on workers.
         if hs.config.worker.worker_app is None:
-            membership.register_servlets(hs, self)
             login.register_servlets(hs, self)
             register.register_servlets(hs, self)
             devices.register_servlets(hs, self)
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index c3136a4eb9..793cef6c26 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -142,6 +142,7 @@ class ReplicationEndpoint(object):
         """
         clock = hs.get_clock()
         client = hs.get_simple_http_client()
+        local_instance_name = hs.get_instance_name()
 
         master_host = hs.config.worker_replication_host
         master_port = hs.config.worker_replication_http_port
@@ -151,6 +152,8 @@ class ReplicationEndpoint(object):
         @trace(opname="outgoing_replication_request")
         @defer.inlineCallbacks
         def send_request(instance_name="master", **kwargs):
+            if instance_name == local_instance_name:
+                raise Exception("Trying to send HTTP request to self")
             if instance_name == "master":
                 host = master_host
                 port = master_port
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index 050fd34562..a7174c4a8f 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -14,12 +14,16 @@
 # limitations under the License.
 
 import logging
+from typing import TYPE_CHECKING
 
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
 from synapse.types import Requester, UserID
 from synapse.util.distributor import user_joined_room, user_left_room
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
@@ -106,6 +110,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
         self.federation_handler = hs.get_handlers().federation_handler
         self.store = hs.get_datastore()
         self.clock = hs.get_clock()
+        self.member_handler = hs.get_room_member_handler()
 
     @staticmethod
     def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content):
@@ -149,12 +154,44 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
             #
             logger.warning("Failed to reject invite: %s", e)
 
-            stream_id = await self.store.locally_reject_invite(user_id, room_id)
+            stream_id = await self.member_handler.locally_reject_invite(
+                user_id, room_id
+            )
             event_id = None
 
         return 200, {"event_id": event_id, "stream_id": stream_id}
 
 
+class ReplicationLocallyRejectInviteRestServlet(ReplicationEndpoint):
+    """Rejects the invite for the user and room locally.
+
+    Request format:
+
+        POST /_synapse/replication/locally_reject_invite/:room_id/:user_id
+
+        {}
+    """
+
+    NAME = "locally_reject_invite"
+    PATH_ARGS = ("room_id", "user_id")
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__(hs)
+
+        self.member_handler = hs.get_room_member_handler()
+
+    @staticmethod
+    def _serialize_payload(room_id, user_id):
+        return {}
+
+    async def _handle_request(self, request, room_id, user_id):
+        logger.info("locally_reject_invite: %s out of room: %s", user_id, room_id)
+
+        stream_id = await self.member_handler.locally_reject_invite(user_id, room_id)
+
+        return 200, {"stream_id": stream_id}
+
+
 class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
     """Notifies that a user has joined or left the room
 
@@ -208,3 +245,4 @@ def register_servlets(hs, http_server):
     ReplicationRemoteJoinRestServlet(hs).register(http_server)
     ReplicationRemoteRejectInviteRestServlet(hs).register(http_server)
     ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server)
+    ReplicationLocallyRejectInviteRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py
new file mode 100644
index 0000000000..ea1b33331b
--- /dev/null
+++ b/synapse/replication/http/presence.py
@@ -0,0 +1,116 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from typing import TYPE_CHECKING
+
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import UserID
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationBumpPresenceActiveTime(ReplicationEndpoint):
+    """We've seen the user do something that indicates they're interacting
+    with the app.
+
+    The POST looks like:
+
+        POST /_synapse/replication/bump_presence_active_time/<user_id>
+
+        200 OK
+
+        {}
+    """
+
+    NAME = "bump_presence_active_time"
+    PATH_ARGS = ("user_id",)
+    METHOD = "POST"
+    CACHE = False
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__(hs)
+
+        self._presence_handler = hs.get_presence_handler()
+
+    @staticmethod
+    def _serialize_payload(user_id):
+        return {}
+
+    async def _handle_request(self, request, user_id):
+        await self._presence_handler.bump_presence_active_time(
+            UserID.from_string(user_id)
+        )
+
+        return (
+            200,
+            {},
+        )
+
+
+class ReplicationPresenceSetState(ReplicationEndpoint):
+    """Set the presence state for a user.
+
+    The POST looks like:
+
+        POST /_synapse/replication/presence_set_state/<user_id>
+
+        {
+            "state": { ... },
+            "ignore_status_msg": false,
+        }
+
+        200 OK
+
+        {}
+    """
+
+    NAME = "presence_set_state"
+    PATH_ARGS = ("user_id",)
+    METHOD = "POST"
+    CACHE = False
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__(hs)
+
+        self._presence_handler = hs.get_presence_handler()
+
+    @staticmethod
+    def _serialize_payload(user_id, state, ignore_status_msg=False):
+        return {
+            "state": state,
+            "ignore_status_msg": ignore_status_msg,
+        }
+
+    async def _handle_request(self, request, user_id):
+        content = parse_json_object_from_request(request)
+
+        await self._presence_handler.set_state(
+            UserID.from_string(user_id), content["state"], content["ignore_status_msg"]
+        )
+
+        return (
+            200,
+            {},
+        )
+
+
+def register_servlets(hs, http_server):
+    ReplicationBumpPresenceActiveTime(hs).register(http_server)
+    ReplicationPresenceSetState(hs).register(http_server)
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index acfa66a7a8..03300e5336 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -38,7 +38,9 @@ from synapse.replication.tcp.commands import (
 from synapse.replication.tcp.protocol import AbstractConnection
 from synapse.replication.tcp.streams import (
     STREAMS_MAP,
+    BackfillStream,
     CachesStream,
+    EventsStream,
     FederationStream,
     Stream,
 )
@@ -87,6 +89,14 @@ class ReplicationCommandHandler:
                 self._streams_to_replicate.append(stream)
                 continue
 
+            if isinstance(stream, (EventsStream, BackfillStream)):
+                # Only add EventStream and BackfillStream as a source on the
+                # instance in charge of event persistence.
+                if hs.config.worker.writers.events == hs.get_instance_name():
+                    self._streams_to_replicate.append(stream)
+
+                continue
+
             # Only add any other streams if we're on master.
             if hs.config.worker_app is not None:
                 continue