summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/pusher.py72
1 files changed, 72 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index b48fe086d4..e47caa2125 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -48,6 +48,11 @@ class PusherWorkerStore(SQLBaseStore):
             self._remove_stale_pushers,
         )
 
+        self.db_pool.updates.register_background_update_handler(
+            "remove_deleted_email_pushers",
+            self._remove_deleted_email_pushers,
+        )
+
     def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]:
         """JSON-decode the data in the rows returned from the `pushers` table
 
@@ -388,6 +393,73 @@ class PusherWorkerStore(SQLBaseStore):
 
         return number_deleted
 
+    async def _remove_deleted_email_pushers(
+        self, progress: dict, batch_size: int
+    ) -> int:
+        """A background update that deletes all pushers for deleted email addresses.
+
+        In previous versions of synapse, when users deleted their email address, it didn't
+        also delete all the pushers for that email address. This background update removes
+        those to prevent unwanted emails. This should only need to be run once (when users
+        upgrade to v1.42.0
+
+        Args:
+            progress: dict used to store progress of this background update
+            batch_size: the maximum number of rows to retrieve in a single select query
+
+        Returns:
+            The number of deleted rows
+        """
+
+        last_pusher = progress.get("last_pusher", 0)
+
+        def _delete_pushers(txn) -> int:
+
+            sql = """
+                SELECT p.id, p.user_name, p.app_id, p.pushkey
+                FROM pushers AS p
+                    LEFT JOIN user_threepids AS t
+                        ON t.user_id = p.user_name
+                        AND t.medium = 'email'
+                        AND t.address = p.pushkey
+                WHERE t.user_id is NULL
+                    AND p.app_id = 'm.email'
+                    AND p.id > ?
+                ORDER BY p.id ASC
+                LIMIT ?
+            """
+
+            txn.execute(sql, (last_pusher, batch_size))
+
+            last = None
+            num_deleted = 0
+            for row in txn:
+                last = row[0]
+                num_deleted += 1
+                self.db_pool.simple_delete_txn(
+                    txn,
+                    "pushers",
+                    {"user_name": row[1], "app_id": row[2], "pushkey": row[3]},
+                )
+
+            if last is not None:
+                self.db_pool.updates._background_update_progress_txn(
+                    txn, "remove_deleted_email_pushers", {"last_pusher": last}
+                )
+
+            return num_deleted
+
+        number_deleted = await self.db_pool.runInteraction(
+            "_remove_deleted_email_pushers", _delete_pushers
+        )
+
+        if number_deleted < batch_size:
+            await self.db_pool.updates._end_background_update(
+                "remove_deleted_email_pushers"
+            )
+
+        return number_deleted
+
 
 class PusherStore(PusherWorkerStore):
     def get_pushers_stream_token(self) -> int: