diff --git a/changelog.d/9536.bugfix b/changelog.d/9536.bugfix
new file mode 100644
index 0000000000..2ab4f315c1
--- /dev/null
+++ b/changelog.d/9536.bugfix
@@ -0,0 +1 @@
+Fix deleting pushers when using sharded pushers.
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 6b608ebc9b..85f1ebac98 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -44,6 +44,11 @@ class PusherWorkerStore(SQLBaseStore):
self._remove_deactivated_pushers,
)
+ self.db_pool.updates.register_background_update_handler(
+ "remove_stale_pushers",
+ self._remove_stale_pushers,
+ )
+
def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]:
"""JSON-decode the data in the rows returned from the `pushers` table
@@ -337,6 +342,53 @@ class PusherWorkerStore(SQLBaseStore):
return number_deleted
+ async def _remove_stale_pushers(self, progress: dict, batch_size: int) -> int:
+ """A background update that deletes all pushers for logged out devices.
+
+ Note that we don't proacively tell the pusherpool that we've deleted
+ these (just because its a bit off a faff to do from here), but they will
+ get cleaned up at the next restart
+ """
+
+ last_pusher = progress.get("last_pusher", 0)
+
+ def _delete_pushers(txn) -> int:
+
+ sql = """
+ SELECT p.id, access_token FROM pushers AS p
+ LEFT JOIN access_tokens AS a ON (p.access_token = a.id)
+ WHERE p.id > ?
+ ORDER BY p.id ASC
+ LIMIT ?
+ """
+
+ txn.execute(sql, (last_pusher, batch_size))
+ pushers = [(row[0], row[1]) for row in txn]
+
+ self.db_pool.simple_delete_many_txn(
+ txn,
+ table="pushers",
+ column="id",
+ iterable=(pusher_id for pusher_id, token in pushers if token is None),
+ keyvalues={},
+ )
+
+ if pushers:
+ self.db_pool.updates._background_update_progress_txn(
+ txn, "remove_stale_pushers", {"last_pusher": pushers[-1][0]}
+ )
+
+ return len(pushers)
+
+ number_deleted = await self.db_pool.runInteraction(
+ "_remove_stale_pushers", _delete_pushers
+ )
+
+ if number_deleted < batch_size:
+ await self.db_pool.updates._end_background_update("remove_stale_pushers")
+
+ return number_deleted
+
class PusherStore(PusherWorkerStore):
def get_pushers_stream_token(self) -> int:
diff --git a/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql b/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql
index 2442eea6bc..85196db288 100644
--- a/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql
+++ b/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql
@@ -16,4 +16,5 @@
-- Delete all pushers associated with deleted devices. This is to clear up after
-- a bug where they weren't correctly deleted when using workers.
-DELETE FROM pushers WHERE access_token NOT IN (SELECT id FROM access_tokens);
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (5908, 'remove_stale_pushers', '{}');
|