diff options
Diffstat (limited to 'synapse/push/pusherpool.py')
-rw-r--r-- | synapse/push/pusherpool.py | 27 |
1 files changed, 20 insertions, 7 deletions
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index ae1145be0e..21f14f05f0 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -25,6 +25,7 @@ from synapse.metrics.background_process_metrics import ( ) from synapse.push import Pusher, PusherConfig, PusherConfigException from synapse.push.pusher import PusherFactory +from synapse.replication.http.push import ReplicationRemovePusherRestServlet from synapse.types import JsonDict, RoomStreamToken from synapse.util.async_helpers import concurrently_execute @@ -58,7 +59,6 @@ class PusherPool: def __init__(self, hs: "HomeServer"): self.hs = hs self.pusher_factory = PusherFactory(hs) - self._should_start_pushers = hs.config.start_pushers self.store = self.hs.get_datastore() self.clock = self.hs.get_clock() @@ -67,6 +67,16 @@ class PusherPool: # We shard the handling of push notifications by user ID. self._pusher_shard_config = hs.config.push.pusher_shard_config self._instance_name = hs.get_instance_name() + self._should_start_pushers = ( + self._instance_name in self._pusher_shard_config.instances + ) + + # We can only delete pushers on master. + self._remove_pusher_client = None + if hs.config.worker.worker_app: + self._remove_pusher_client = ReplicationRemovePusherRestServlet.make_client( + hs + ) # Record the last stream ID that we were poked about so we can get # changes since then. We set this to the current max stream ID on @@ -175,9 +185,6 @@ class PusherPool: user_id: user to remove pushers for access_tokens: access token *ids* to remove pushers for """ - if not self._pusher_shard_config.should_handle(self._instance_name, user_id): - return - tokens = set(access_tokens) for p in await self.store.get_pushers_by_user_id(user_id): if p.access_token in tokens: @@ -380,6 +387,12 @@ class PusherPool: synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec() - await self.store.delete_pusher_by_app_id_pushkey_user_id( - app_id, pushkey, user_id - ) + # We can only delete pushers on master. + if self._remove_pusher_client: + await self._remove_pusher_client( + app_id=app_id, pushkey=pushkey, user_id=user_id + ) + else: + await self.store.delete_pusher_by_app_id_pushkey_user_id( + app_id, pushkey, user_id + ) |