diff options
-rw-r--r-- | changelog.d/9536.bugfix | 1 | ||||
-rw-r--r-- | synapse/storage/databases/main/pusher.py | 52 | ||||
-rw-r--r-- | synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql | 3 |
3 files changed, 55 insertions, 1 deletions
diff --git a/changelog.d/9536.bugfix b/changelog.d/9536.bugfix new file mode 100644 index 0000000000..2ab4f315c1 --- /dev/null +++ b/changelog.d/9536.bugfix @@ -0,0 +1 @@ +Fix deleting pushers when using sharded pushers. diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index 6b608ebc9b..85f1ebac98 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -44,6 +44,11 @@ class PusherWorkerStore(SQLBaseStore): self._remove_deactivated_pushers, ) + self.db_pool.updates.register_background_update_handler( + "remove_stale_pushers", + self._remove_stale_pushers, + ) + def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]: """JSON-decode the data in the rows returned from the `pushers` table @@ -337,6 +342,53 @@ class PusherWorkerStore(SQLBaseStore): return number_deleted + async def _remove_stale_pushers(self, progress: dict, batch_size: int) -> int: + """A background update that deletes all pushers for logged out devices. + + Note that we don't proacively tell the pusherpool that we've deleted + these (just because its a bit off a faff to do from here), but they will + get cleaned up at the next restart + """ + + last_pusher = progress.get("last_pusher", 0) + + def _delete_pushers(txn) -> int: + + sql = """ + SELECT p.id, access_token FROM pushers AS p + LEFT JOIN access_tokens AS a ON (p.access_token = a.id) + WHERE p.id > ? + ORDER BY p.id ASC + LIMIT ? + """ + + txn.execute(sql, (last_pusher, batch_size)) + pushers = [(row[0], row[1]) for row in txn] + + self.db_pool.simple_delete_many_txn( + txn, + table="pushers", + column="id", + iterable=(pusher_id for pusher_id, token in pushers if token is None), + keyvalues={}, + ) + + if pushers: + self.db_pool.updates._background_update_progress_txn( + txn, "remove_stale_pushers", {"last_pusher": pushers[-1][0]} + ) + + return len(pushers) + + number_deleted = await self.db_pool.runInteraction( + "_remove_stale_pushers", _delete_pushers + ) + + if number_deleted < batch_size: + await self.db_pool.updates._end_background_update("remove_stale_pushers") + + return number_deleted + class PusherStore(PusherWorkerStore): def get_pushers_stream_token(self) -> int: diff --git a/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql b/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql index 2442eea6bc..85196db288 100644 --- a/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql +++ b/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql @@ -16,4 +16,5 @@ -- Delete all pushers associated with deleted devices. This is to clear up after -- a bug where they weren't correctly deleted when using workers. -DELETE FROM pushers WHERE access_token NOT IN (SELECT id FROM access_tokens); +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (5908, 'remove_stale_pushers', '{}'); |