diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index dd527e807f..cb4a52dbe9 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -21,6 +21,7 @@ from synapse.replication.http import (
login,
membership,
presence,
+ push,
register,
send_event,
streams,
@@ -42,6 +43,7 @@ class ReplicationRestResource(JsonResource):
membership.register_servlets(hs, self)
streams.register_servlets(hs, self)
account_data.register_servlets(hs, self)
+ push.register_servlets(hs, self)
# The following can't currently be instantiated on workers.
if hs.config.worker.worker_app is None:
diff --git a/synapse/replication/http/push.py b/synapse/replication/http/push.py
new file mode 100644
index 0000000000..054ed64d34
--- /dev/null
+++ b/synapse/replication/http/push.py
@@ -0,0 +1,72 @@
+# -*- coding: utf-8 -*-
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from typing import TYPE_CHECKING
+
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationRemovePusherRestServlet(ReplicationEndpoint):
+ """Deletes the given pusher.
+
+ Request format:
+
+ POST /_synapse/replication/remove_pusher/:user_id
+
+ {
+ "app_id": "<some_id>",
+ "pushkey": "<some_key>"
+ }
+
+ """
+
+ NAME = "add_user_account_data"
+ PATH_ARGS = ("user_id",)
+ CACHE = False
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__(hs)
+
+ self.pusher_pool = hs.get_pusherpool()
+
+ @staticmethod
+ async def _serialize_payload(app_id, pushkey, user_id):
+ payload = {
+ "app_id": app_id,
+ "pushkey": pushkey,
+ }
+
+ return payload
+
+ async def _handle_request(self, request, user_id):
+ content = parse_json_object_from_request(request)
+
+ app_id = content["app_id"]
+ pushkey = content["pushkey"]
+
+ await self.pusher_pool.remove_pusher(app_id, pushkey, user_id)
+
+ return 200, {}
+
+
+def register_servlets(hs, http_server):
+ ReplicationRemovePusherRestServlet(hs).register(http_server)
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 0a9da79c32..bb447f75b4 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -325,31 +325,6 @@ class FederationAckCommand(Command):
return "%s %s" % (self.instance_name, self.token)
-class RemovePusherCommand(Command):
- """Sent by the client to request the master remove the given pusher.
-
- Format::
-
- REMOVE_PUSHER <app_id> <push_key> <user_id>
- """
-
- NAME = "REMOVE_PUSHER"
-
- def __init__(self, app_id, push_key, user_id):
- self.user_id = user_id
- self.app_id = app_id
- self.push_key = push_key
-
- @classmethod
- def from_line(cls, line):
- app_id, push_key, user_id = line.split(" ", 2)
-
- return cls(app_id, push_key, user_id)
-
- def to_line(self):
- return " ".join((self.app_id, self.push_key, self.user_id))
-
-
class UserIpCommand(Command):
"""Sent periodically when a worker sees activity from a client.
@@ -416,7 +391,6 @@ _COMMANDS = (
ReplicateCommand,
UserSyncCommand,
FederationAckCommand,
- RemovePusherCommand,
UserIpCommand,
RemoteServerUpCommand,
ClearUserSyncsCommand,
@@ -443,7 +417,6 @@ VALID_CLIENT_COMMANDS = (
UserSyncCommand.NAME,
ClearUserSyncsCommand.NAME,
FederationAckCommand.NAME,
- RemovePusherCommand.NAME,
UserIpCommand.NAME,
ErrorCommand.NAME,
RemoteServerUpCommand.NAME,
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index d1d00c3717..a7245da152 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -44,7 +44,6 @@ from synapse.replication.tcp.commands import (
PositionCommand,
RdataCommand,
RemoteServerUpCommand,
- RemovePusherCommand,
ReplicateCommand,
UserIpCommand,
UserSyncCommand,
@@ -373,23 +372,6 @@ class ReplicationCommandHandler:
if self._federation_sender:
self._federation_sender.federation_ack(cmd.instance_name, cmd.token)
- def on_REMOVE_PUSHER(
- self, conn: AbstractConnection, cmd: RemovePusherCommand
- ) -> Optional[Awaitable[None]]:
- remove_pusher_counter.inc()
-
- if self._is_master:
- return self._handle_remove_pusher(cmd)
- else:
- return None
-
- async def _handle_remove_pusher(self, cmd: RemovePusherCommand):
- await self._store.delete_pusher_by_app_id_pushkey_user_id(
- app_id=cmd.app_id, pushkey=cmd.push_key, user_id=cmd.user_id
- )
-
- self._notifier.on_new_replication_data()
-
def on_USER_IP(
self, conn: AbstractConnection, cmd: UserIpCommand
) -> Optional[Awaitable[None]]:
@@ -684,11 +666,6 @@ class ReplicationCommandHandler:
UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms)
)
- def send_remove_pusher(self, app_id: str, push_key: str, user_id: str):
- """Poke the master to remove a pusher for a user"""
- cmd = RemovePusherCommand(app_id, push_key, user_id)
- self.send_command(cmd)
-
def send_user_ip(
self,
user_id: str,
|