diff --git a/changelog.d/10734.bugfix b/changelog.d/10734.bugfix
new file mode 100644
index 0000000000..15c7da4497
--- /dev/null
+++ b/changelog.d/10734.bugfix
@@ -0,0 +1 @@
+Remove pushers when deleting a 3pid from an account. Pushers for old unlinked emails will also be deleted.
\ No newline at end of file
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index e47caa2125..63ac09c61d 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -430,10 +430,11 @@ class PusherWorkerStore(SQLBaseStore):
"""
txn.execute(sql, (last_pusher, batch_size))
+ rows = txn.fetchall()
last = None
num_deleted = 0
- for row in txn:
+ for row in rows:
last = row[0]
num_deleted += 1
self.db_pool.simple_delete_txn(
diff --git a/tests/push/test_email.py b/tests/push/test_email.py
index eea07485a0..c4ba13a6b2 100644
--- a/tests/push/test_email.py
+++ b/tests/push/test_email.py
@@ -344,6 +344,50 @@ class EmailPusherTests(HomeserverTestCase):
pushers = list(pushers)
self.assertEqual(len(pushers), 0)
+ def test_remove_unlinked_pushers_background_job(self):
+ """Checks that all existing pushers associated with unlinked email addresses are removed
+ upon running the remove_deleted_email_pushers background update.
+ """
+ # disassociate the user's email address manually (without deleting the pusher).
+ # This resembles the old behaviour, which the background update below is intended
+ # to clean up.
+ self.get_success(
+ self.hs.get_datastore().user_delete_threepid(
+ self.user_id, "email", "a@example.com"
+ )
+ )
+
+ # Run the "remove_deleted_email_pushers" background job
+ self.get_success(
+ self.hs.get_datastore().db_pool.simple_insert(
+ table="background_updates",
+ values={
+ "update_name": "remove_deleted_email_pushers",
+ "progress_json": "{}",
+ "depends_on": None,
+ },
+ )
+ )
+
+ # ... and tell the DataStore that it hasn't finished all updates yet
+ self.hs.get_datastore().db_pool.updates._all_done = False
+
+ # Now let's actually drive the updates to completion
+ while not self.get_success(
+ self.hs.get_datastore().db_pool.updates.has_completed_background_updates()
+ ):
+ self.get_success(
+ self.hs.get_datastore().db_pool.updates.do_next_background_update(100),
+ by=0.1,
+ )
+
+ # Check that all pushers with unlinked addresses were deleted
+ pushers = self.get_success(
+ self.hs.get_datastore().get_pushers_by({"user_name": self.user_id})
+ )
+ pushers = list(pushers)
+ self.assertEqual(len(pushers), 0)
+
def _check_for_mail(self):
"""Check that the user receives an email notification"""
|