summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--changelog.d/9465.bugfix1
-rw-r--r--docs/tcp_replication.md4
-rw-r--r--synapse/app/generic_worker.py3
-rw-r--r--synapse/push/httppusher.py3
-rw-r--r--synapse/push/pusherpool.py23
-rw-r--r--synapse/replication/http/__init__.py2
-rw-r--r--synapse/replication/http/push.py72
-rw-r--r--synapse/replication/tcp/commands.py27
-rw-r--r--synapse/replication/tcp/handler.py23
-rw-r--r--synapse/server.py3
10 files changed, 94 insertions, 67 deletions
diff --git a/changelog.d/9465.bugfix b/changelog.d/9465.bugfix
new file mode 100644
index 0000000000..2ab4f315c1
--- /dev/null
+++ b/changelog.d/9465.bugfix
@@ -0,0 +1 @@
+Fix deleting pushers when using sharded pushers.
diff --git a/docs/tcp_replication.md b/docs/tcp_replication.md
index ad145439b4..15df949deb 100644
--- a/docs/tcp_replication.md
+++ b/docs/tcp_replication.md
@@ -220,10 +220,6 @@ Asks the server for the current position of all streams.
 
    Acknowledge receipt of some federation data
 
-#### REMOVE_PUSHER (C)
-
-   Inform the server a pusher should be removed
-
 ### REMOTE_SERVER_UP (S, C)
 
    Inform other processes that a remote server may have come back online.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 6526acb2f2..6ed405e66b 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -645,9 +645,6 @@ class GenericWorkerServer(HomeServer):
 
         self.get_tcp_replication().start_replication(self)
 
-    async def remove_pusher(self, app_id, push_key, user_id):
-        self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
-
     @cache_in_self
     def get_replication_data_handler(self):
         return GenericWorkerReplicationHandler(self)
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index b9d3da2e0a..f4d7e199e9 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -74,6 +74,7 @@ class HttpPusher(Pusher):
         self.timed_call = None
         self._is_processing = False
         self._group_unread_count_by_room = hs.config.push_group_unread_count_by_room
+        self._pusherpool = hs.get_pusherpool()
 
         self.data = pusher_config.data
         if self.data is None:
@@ -299,7 +300,7 @@ class HttpPusher(Pusher):
                     )
                 else:
                     logger.info("Pushkey %s was rejected: removing", pk)
-                    await self.hs.remove_pusher(self.app_id, pk, self.user_id)
+                    await self._pusherpool.remove_pusher(self.app_id, pk, self.user_id)
         return True
 
     async def _build_notification_dict(
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index ae1145be0e..3936bf8784 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -25,6 +25,7 @@ from synapse.metrics.background_process_metrics import (
 )
 from synapse.push import Pusher, PusherConfig, PusherConfigException
 from synapse.push.pusher import PusherFactory
+from synapse.replication.http.push import ReplicationRemovePusherRestServlet
 from synapse.types import JsonDict, RoomStreamToken
 from synapse.util.async_helpers import concurrently_execute
 
@@ -68,6 +69,13 @@ class PusherPool:
         self._pusher_shard_config = hs.config.push.pusher_shard_config
         self._instance_name = hs.get_instance_name()
 
+        # We can only delete pushers on master.
+        self._remove_pusher_client = None
+        if hs.config.worker.worker_app:
+            self._remove_pusher_client = ReplicationRemovePusherRestServlet.make_client(
+                hs
+            )
+
         # Record the last stream ID that we were poked about so we can get
         # changes since then. We set this to the current max stream ID on
         # startup as every individual pusher will have checked for changes on
@@ -175,9 +183,6 @@ class PusherPool:
             user_id: user to remove pushers for
             access_tokens: access token *ids* to remove pushers for
         """
-        if not self._pusher_shard_config.should_handle(self._instance_name, user_id):
-            return
-
         tokens = set(access_tokens)
         for p in await self.store.get_pushers_by_user_id(user_id):
             if p.access_token in tokens:
@@ -380,6 +385,12 @@ class PusherPool:
 
             synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec()
 
-        await self.store.delete_pusher_by_app_id_pushkey_user_id(
-            app_id, pushkey, user_id
-        )
+        # We can only delete pushers on master.
+        if self._remove_pusher_client:
+            await self._remove_pusher_client(
+                app_id=app_id, pushkey=pushkey, user_id=user_id
+            )
+        else:
+            await self.store.delete_pusher_by_app_id_pushkey_user_id(
+                app_id, pushkey, user_id
+            )
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index dd527e807f..cb4a52dbe9 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -21,6 +21,7 @@ from synapse.replication.http import (
     login,
     membership,
     presence,
+    push,
     register,
     send_event,
     streams,
@@ -42,6 +43,7 @@ class ReplicationRestResource(JsonResource):
         membership.register_servlets(hs, self)
         streams.register_servlets(hs, self)
         account_data.register_servlets(hs, self)
+        push.register_servlets(hs, self)
 
         # The following can't currently be instantiated on workers.
         if hs.config.worker.worker_app is None:
diff --git a/synapse/replication/http/push.py b/synapse/replication/http/push.py
new file mode 100644
index 0000000000..054ed64d34
--- /dev/null
+++ b/synapse/replication/http/push.py
@@ -0,0 +1,72 @@
+# -*- coding: utf-8 -*-
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from typing import TYPE_CHECKING
+
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationRemovePusherRestServlet(ReplicationEndpoint):
+    """Deletes the given pusher.
+
+    Request format:
+
+        POST /_synapse/replication/remove_pusher/:user_id
+
+        {
+            "app_id": "<some_id>",
+            "pushkey": "<some_key>"
+        }
+
+    """
+
+    NAME = "add_user_account_data"
+    PATH_ARGS = ("user_id",)
+    CACHE = False
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__(hs)
+
+        self.pusher_pool = hs.get_pusherpool()
+
+    @staticmethod
+    async def _serialize_payload(app_id, pushkey, user_id):
+        payload = {
+            "app_id": app_id,
+            "pushkey": pushkey,
+        }
+
+        return payload
+
+    async def _handle_request(self, request, user_id):
+        content = parse_json_object_from_request(request)
+
+        app_id = content["app_id"]
+        pushkey = content["pushkey"]
+
+        await self.pusher_pool.remove_pusher(app_id, pushkey, user_id)
+
+        return 200, {}
+
+
+def register_servlets(hs, http_server):
+    ReplicationRemovePusherRestServlet(hs).register(http_server)
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 0a9da79c32..bb447f75b4 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -325,31 +325,6 @@ class FederationAckCommand(Command):
         return "%s %s" % (self.instance_name, self.token)
 
 
-class RemovePusherCommand(Command):
-    """Sent by the client to request the master remove the given pusher.
-
-    Format::
-
-        REMOVE_PUSHER <app_id> <push_key> <user_id>
-    """
-
-    NAME = "REMOVE_PUSHER"
-
-    def __init__(self, app_id, push_key, user_id):
-        self.user_id = user_id
-        self.app_id = app_id
-        self.push_key = push_key
-
-    @classmethod
-    def from_line(cls, line):
-        app_id, push_key, user_id = line.split(" ", 2)
-
-        return cls(app_id, push_key, user_id)
-
-    def to_line(self):
-        return " ".join((self.app_id, self.push_key, self.user_id))
-
-
 class UserIpCommand(Command):
     """Sent periodically when a worker sees activity from a client.
 
@@ -416,7 +391,6 @@ _COMMANDS = (
     ReplicateCommand,
     UserSyncCommand,
     FederationAckCommand,
-    RemovePusherCommand,
     UserIpCommand,
     RemoteServerUpCommand,
     ClearUserSyncsCommand,
@@ -443,7 +417,6 @@ VALID_CLIENT_COMMANDS = (
     UserSyncCommand.NAME,
     ClearUserSyncsCommand.NAME,
     FederationAckCommand.NAME,
-    RemovePusherCommand.NAME,
     UserIpCommand.NAME,
     ErrorCommand.NAME,
     RemoteServerUpCommand.NAME,
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index d1d00c3717..a7245da152 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -44,7 +44,6 @@ from synapse.replication.tcp.commands import (
     PositionCommand,
     RdataCommand,
     RemoteServerUpCommand,
-    RemovePusherCommand,
     ReplicateCommand,
     UserIpCommand,
     UserSyncCommand,
@@ -373,23 +372,6 @@ class ReplicationCommandHandler:
         if self._federation_sender:
             self._federation_sender.federation_ack(cmd.instance_name, cmd.token)
 
-    def on_REMOVE_PUSHER(
-        self, conn: AbstractConnection, cmd: RemovePusherCommand
-    ) -> Optional[Awaitable[None]]:
-        remove_pusher_counter.inc()
-
-        if self._is_master:
-            return self._handle_remove_pusher(cmd)
-        else:
-            return None
-
-    async def _handle_remove_pusher(self, cmd: RemovePusherCommand):
-        await self._store.delete_pusher_by_app_id_pushkey_user_id(
-            app_id=cmd.app_id, pushkey=cmd.push_key, user_id=cmd.user_id
-        )
-
-        self._notifier.on_new_replication_data()
-
     def on_USER_IP(
         self, conn: AbstractConnection, cmd: UserIpCommand
     ) -> Optional[Awaitable[None]]:
@@ -684,11 +666,6 @@ class ReplicationCommandHandler:
             UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms)
         )
 
-    def send_remove_pusher(self, app_id: str, push_key: str, user_id: str):
-        """Poke the master to remove a pusher for a user"""
-        cmd = RemovePusherCommand(app_id, push_key, user_id)
-        self.send_command(cmd)
-
     def send_user_ip(
         self,
         user_id: str,
diff --git a/synapse/server.py b/synapse/server.py
index 6b3892e3cd..5de8782000 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -758,9 +758,6 @@ class HomeServer(metaclass=abc.ABCMeta):
             reconnect=True,
         )
 
-    async def remove_pusher(self, app_id: str, push_key: str, user_id: str):
-        return await self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
-
     def should_send_federation(self) -> bool:
         "Should this server be sending federation traffic directly?"
         return self.config.send_federation and (