diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index ae1145be0e..3936bf8784 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -25,6 +25,7 @@ from synapse.metrics.background_process_metrics import (
)
from synapse.push import Pusher, PusherConfig, PusherConfigException
from synapse.push.pusher import PusherFactory
+from synapse.replication.http.push import ReplicationRemovePusherRestServlet
from synapse.types import JsonDict, RoomStreamToken
from synapse.util.async_helpers import concurrently_execute
@@ -68,6 +69,13 @@ class PusherPool:
self._pusher_shard_config = hs.config.push.pusher_shard_config
self._instance_name = hs.get_instance_name()
+ # We can only delete pushers on master.
+ self._remove_pusher_client = None
+ if hs.config.worker.worker_app:
+ self._remove_pusher_client = ReplicationRemovePusherRestServlet.make_client(
+ hs
+ )
+
# Record the last stream ID that we were poked about so we can get
# changes since then. We set this to the current max stream ID on
# startup as every individual pusher will have checked for changes on
@@ -175,9 +183,6 @@ class PusherPool:
user_id: user to remove pushers for
access_tokens: access token *ids* to remove pushers for
"""
- if not self._pusher_shard_config.should_handle(self._instance_name, user_id):
- return
-
tokens = set(access_tokens)
for p in await self.store.get_pushers_by_user_id(user_id):
if p.access_token in tokens:
@@ -380,6 +385,12 @@ class PusherPool:
synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec()
- await self.store.delete_pusher_by_app_id_pushkey_user_id(
- app_id, pushkey, user_id
- )
+ # We can only delete pushers on master.
+ if self._remove_pusher_client:
+ await self._remove_pusher_client(
+ app_id=app_id, pushkey=pushkey, user_id=user_id
+ )
+ else:
+ await self.store.delete_pusher_by_app_id_pushkey_user_id(
+ app_id, pushkey, user_id
+ )
|