diff --git a/changelog.d/9285.bugfix b/changelog.d/9285.bugfix
new file mode 100644
index 0000000000..81188c5473
--- /dev/null
+++ b/changelog.d/9285.bugfix
@@ -0,0 +1 @@
+Fix a bug where users' pushers were not all deleted when they deactivated their account.
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 94f3f3163f..3886d3124d 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -120,6 +120,11 @@ class DeactivateAccountHandler(BaseHandler):
await self.store.user_set_password_hash(user_id, None)
+ # Most of the pushers will have been deleted when we logged out the
+ # associated devices above, but we still need to delete pushers not
+ # associated with devices, e.g. email pushers.
+ await self.store.delete_all_pushers_for_user(user_id)
+
# Add the user to a table of users pending deactivation (ie.
# removal from all the rooms they're a member of)
await self.store.add_user_pending_deactivation(user_id)
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 7cb69dd6bd..74219cb05e 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -373,3 +373,46 @@ class PusherStore(PusherWorkerStore):
await self.db_pool.runInteraction(
"delete_pusher", delete_pusher_txn, stream_id
)
+
+ async def delete_all_pushers_for_user(self, user_id: str) -> None:
+ """Delete all pushers associated with an account."""
+
+ # We want to generate a row in `deleted_pushers` for each pusher we're
+ # deleting, so we fetch the list now so we can generate the appropriate
+ # number of stream IDs.
+ #
+ # Note: technically there could be a race here between adding/deleting
+ # pushers, but a) the worst case if we don't stop a pusher until the
+ # next restart and b) this is only called when we're deactivating an
+ # account.
+ pushers = list(await self.get_pushers_by_user_id(user_id))
+
+ def delete_pushers_txn(txn, stream_ids):
+ self._invalidate_cache_and_stream( # type: ignore
+ txn, self.get_if_user_has_pusher, (user_id,)
+ )
+
+ self.db_pool.simple_delete_txn(
+ txn,
+ table="pushers",
+ keyvalues={"user_name": user_id},
+ )
+
+ self.db_pool.simple_insert_many_txn(
+ txn,
+ table="deleted_pushers",
+ values=[
+ {
+ "stream_id": stream_id,
+ "app_id": pusher.app_id,
+ "pushkey": pusher.pushkey,
+ "user_id": user_id,
+ }
+ for stream_id, pusher in zip(stream_ids, pushers)
+ ],
+ )
+
+ async with self._pushers_id_gen.get_next_mult(len(pushers)) as stream_ids:
+ await self.db_pool.runInteraction(
+ "delete_all_pushers_for_user", delete_pushers_txn, stream_ids
+ )
diff --git a/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql b/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql
new file mode 100644
index 0000000000..20ba4abca3
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql
@@ -0,0 +1,21 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- We may not have deleted all pushers for deactivated accounts. Do so now.
+--
+-- Note: We don't bother updating the `deleted_pushers` table as it's just use
+-- to stop pushers on workers, and that will happen when they get next restarted.
+DELETE FROM pushers WHERE user_name IN (SELECT name FROM users WHERE deactivated = 1);
|