summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/federation.py16
-rw-r--r--synapse/replication/http/federation.py35
2 files changed, 49 insertions, 2 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 37d2307d0a..acabca1d25 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -50,6 +50,7 @@ from synapse.crypto.event_signing import (
 )
 from synapse.events.validator import EventValidator
 from synapse.replication.http.federation import (
+    ReplicationCleanRoomRestServlet,
     ReplicationFederationSendEventsRestServlet,
 )
 from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
@@ -104,6 +105,9 @@ class FederationHandler(BaseHandler):
         self._notify_user_membership_change = (
             ReplicationUserJoinedLeftRoomRestServlet.make_client(hs)
         )
+        self._clean_room_for_join_client = (
+            ReplicationCleanRoomRestServlet.make_client(hs)
+        )
 
         # When joining a room we need to queue any events for that room up
         self.room_queues = {}
@@ -2388,8 +2392,16 @@ class FederationHandler(BaseHandler):
         )
 
     def _clean_room_for_join(self, room_id):
-        # TODO move this out to master
-        return self.store.clean_room_for_join(room_id)
+        """Called to clean up any data in DB for a given room, ready for the
+        server to join the room.
+
+        Args:
+            room_id (str)
+        """
+        if self.config.worker_app:
+            return self._clean_room_for_join_client(room_id)
+        else:
+            return self.store.clean_room_for_join(room_id)
 
     def user_joined_room(self, user, room_id):
         """Called when a new user has joined the room
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 3e6cbbf5a1..7b0b1cd32e 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -256,7 +256,42 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint):
         defer.returnValue((200, result))
 
 
+class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
+    """Called to clean up any data in DB for a given room, ready for the
+    server to join the room.
+
+    Request format:
+
+        POST /_synapse/replication/fed_query/:fed_cleanup_room/:txn_id
+
+        {}
+    """
+
+    NAME = "fed_cleanup_room"
+    PATH_ARGS = ("room_id",)
+
+    def __init__(self, hs):
+        super(ReplicationCleanRoomRestServlet, self).__init__(hs)
+
+        self.store = hs.get_datastore()
+
+    @staticmethod
+    def _serialize_payload(room_id, args):
+        """
+        Args:
+            room_id (str)
+        """
+        return {}
+
+    @defer.inlineCallbacks
+    def _handle_request(self, request, room_id):
+        yield self.store.clean_room_for_join(room_id)
+
+        defer.returnValue((200, {}))
+
+
 def register_servlets(hs, http_server):
     ReplicationFederationSendEventsRestServlet(hs).register(http_server)
     ReplicationFederationSendEduRestServlet(hs).register(http_server)
     ReplicationGetQueryRestServlet(hs).register(http_server)
+    ReplicationCleanRoomRestServlet(hs).register(http_server)