diff --git a/changelog.d/9516.bugfix b/changelog.d/9516.bugfix
new file mode 100644
index 0000000000..81188c5473
--- /dev/null
+++ b/changelog.d/9516.bugfix
@@ -0,0 +1 @@
+Fix a bug where users' pushers were not all deleted when they deactivated their account.
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index d2aaea08f5..58edf6af6c 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -47,6 +47,7 @@ from synapse.storage.databases.main.events_bg_updates import (
from synapse.storage.databases.main.media_repository import (
MediaRepositoryBackgroundUpdateStore,
)
+from synapse.storage.databases.main.pusher import PusherWorkerStore
from synapse.storage.databases.main.registration import (
RegistrationBackgroundUpdateStore,
find_max_generated_user_id_localpart,
@@ -177,6 +178,7 @@ class Store(
UserDirectoryBackgroundUpdateStore,
EndToEndKeyBackgroundStore,
StatsStore,
+ PusherWorkerStore,
):
def execute(self, f, *args, **kwargs):
return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 74219cb05e..6b608ebc9b 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -39,6 +39,11 @@ class PusherWorkerStore(SQLBaseStore):
db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
)
+ self.db_pool.updates.register_background_update_handler(
+ "remove_deactivated_pushers",
+ self._remove_deactivated_pushers,
+ )
+
def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]:
"""JSON-decode the data in the rows returned from the `pushers` table
@@ -284,6 +289,54 @@ class PusherWorkerStore(SQLBaseStore):
lock=False,
)
+ async def _remove_deactivated_pushers(self, progress: dict, batch_size: int) -> int:
+ """A background update that deletes all pushers for deactivated users.
+
+ Note that we don't proacively tell the pusherpool that we've deleted
+ these (just because its a bit off a faff to do from here), but they will
+ get cleaned up at the next restart
+ """
+
+ last_user = progress.get("last_user", "")
+
+ def _delete_pushers(txn) -> int:
+
+ sql = """
+ SELECT name FROM users
+ WHERE deactivated = ? and name > ?
+ ORDER BY name ASC
+ LIMIT ?
+ """
+
+ txn.execute(sql, (1, last_user, batch_size))
+ users = [row[0] for row in txn]
+
+ self.db_pool.simple_delete_many_txn(
+ txn,
+ table="pushers",
+ column="user_name",
+ iterable=users,
+ keyvalues={},
+ )
+
+ if users:
+ self.db_pool.updates._background_update_progress_txn(
+ txn, "remove_deactivated_pushers", {"last_user": users[-1]}
+ )
+
+ return len(users)
+
+ number_deleted = await self.db_pool.runInteraction(
+ "_remove_deactivated_pushers", _delete_pushers
+ )
+
+ if number_deleted < batch_size:
+ await self.db_pool.updates._end_background_update(
+ "remove_deactivated_pushers"
+ )
+
+ return number_deleted
+
class PusherStore(PusherWorkerStore):
def get_pushers_stream_token(self) -> int:
diff --git a/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql b/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql
index 20ba4abca3..0ec6764150 100644
--- a/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql
+++ b/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql
@@ -14,8 +14,7 @@
*/
--- We may not have deleted all pushers for deactivated accounts. Do so now.
---
--- Note: We don't bother updating the `deleted_pushers` table as it's just use
--- to stop pushers on workers, and that will happen when they get next restarted.
-DELETE FROM pushers WHERE user_name IN (SELECT name FROM users WHERE deactivated = 1);
+-- We may not have deleted all pushers for deactivated accounts, so we set up a
+-- background job to delete them.
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (5908, 'remove_deactivated_pushers', '{}');
|