summary refs log tree commit diff
path: root/synapse/push/pusherpool.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/push/pusherpool.py')
-rw-r--r--synapse/push/pusherpool.py33
1 files changed, 26 insertions, 7 deletions
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index ae1145be0e..4c7f5fecee 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,12 +19,14 @@ from typing import TYPE_CHECKING, Dict, Iterable, Optional
 
 from prometheus_client import Gauge
 
+from synapse.api.errors import Codes, SynapseError
 from synapse.metrics.background_process_metrics import (
     run_as_background_process,
     wrap_as_background_process,
 )
 from synapse.push import Pusher, PusherConfig, PusherConfigException
 from synapse.push.pusher import PusherFactory
+from synapse.replication.http.push import ReplicationRemovePusherRestServlet
 from synapse.types import JsonDict, RoomStreamToken
 from synapse.util.async_helpers import concurrently_execute
 
@@ -58,7 +60,6 @@ class PusherPool:
     def __init__(self, hs: "HomeServer"):
         self.hs = hs
         self.pusher_factory = PusherFactory(hs)
-        self._should_start_pushers = hs.config.start_pushers
         self.store = self.hs.get_datastore()
         self.clock = self.hs.get_clock()
 
@@ -67,6 +68,16 @@ class PusherPool:
         # We shard the handling of push notifications by user ID.
         self._pusher_shard_config = hs.config.push.pusher_shard_config
         self._instance_name = hs.get_instance_name()
+        self._should_start_pushers = (
+            self._instance_name in self._pusher_shard_config.instances
+        )
+
+        # We can only delete pushers on master.
+        self._remove_pusher_client = None
+        if hs.config.worker.worker_app:
+            self._remove_pusher_client = ReplicationRemovePusherRestServlet.make_client(
+                hs
+            )
 
         # Record the last stream ID that we were poked about so we can get
         # changes since then. We set this to the current max stream ID on
@@ -103,6 +114,11 @@ class PusherPool:
             The newly created pusher.
         """
 
+        if kind == "email":
+            email_owner = await self.store.get_user_id_by_threepid("email", pushkey)
+            if email_owner != user_id:
+                raise SynapseError(400, "Email not found", Codes.THREEPID_NOT_FOUND)
+
         time_now_msec = self.clock.time_msec()
 
         # create the pusher setting last_stream_ordering to the current maximum
@@ -175,9 +191,6 @@ class PusherPool:
             user_id: user to remove pushers for
             access_tokens: access token *ids* to remove pushers for
         """
-        if not self._pusher_shard_config.should_handle(self._instance_name, user_id):
-            return
-
         tokens = set(access_tokens)
         for p in await self.store.get_pushers_by_user_id(user_id):
             if p.access_token in tokens:
@@ -380,6 +393,12 @@ class PusherPool:
 
             synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec()
 
-        await self.store.delete_pusher_by_app_id_pushkey_user_id(
-            app_id, pushkey, user_id
-        )
+        # We can only delete pushers on master.
+        if self._remove_pusher_client:
+            await self._remove_pusher_client(
+                app_id=app_id, pushkey=pushkey, user_id=user_id
+            )
+        else:
+            await self.store.delete_pusher_by_app_id_pushkey_user_id(
+                app_id, pushkey, user_id
+            )