summary refs log tree commit diff
path: root/synapse/storage/databases/main/pusher.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/pusher.py')
-rw-r--r--synapse/storage/databases/main/pusher.py43
1 files changed, 43 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 7cb69dd6bd..74219cb05e 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -373,3 +373,46 @@ class PusherStore(PusherWorkerStore):
             await self.db_pool.runInteraction(
                 "delete_pusher", delete_pusher_txn, stream_id
             )
+
+    async def delete_all_pushers_for_user(self, user_id: str) -> None:
+        """Delete all pushers associated with an account."""
+
+        # We want to generate a row in `deleted_pushers` for each pusher we're
+        # deleting, so we fetch the list now so we can generate the appropriate
+        # number of stream IDs.
+        #
+        # Note: technically there could be a race here between adding/deleting
+        # pushers, but a) the worst case if we don't stop a pusher until the
+        # next restart and b) this is only called when we're deactivating an
+        # account.
+        pushers = list(await self.get_pushers_by_user_id(user_id))
+
+        def delete_pushers_txn(txn, stream_ids):
+            self._invalidate_cache_and_stream(  # type: ignore
+                txn, self.get_if_user_has_pusher, (user_id,)
+            )
+
+            self.db_pool.simple_delete_txn(
+                txn,
+                table="pushers",
+                keyvalues={"user_name": user_id},
+            )
+
+            self.db_pool.simple_insert_many_txn(
+                txn,
+                table="deleted_pushers",
+                values=[
+                    {
+                        "stream_id": stream_id,
+                        "app_id": pusher.app_id,
+                        "pushkey": pusher.pushkey,
+                        "user_id": user_id,
+                    }
+                    for stream_id, pusher in zip(stream_ids, pushers)
+                ],
+            )
+
+        async with self._pushers_id_gen.get_next_mult(len(pushers)) as stream_ids:
+            await self.db_pool.runInteraction(
+                "delete_all_pushers_for_user", delete_pushers_txn, stream_ids
+            )