summary refs log tree commit diff
path: root/synapse/storage/databases/main/pusher.py
diff options
context:
space:
mode:
authorBrendan Abolivier <babolivier@matrix.org>2022-09-21 16:31:53 +0100
committerGitHub <noreply@github.com>2022-09-21 15:31:53 +0000
commitccca14140a019c2e0430f95d78fa075efd8d535f (patch)
tree5a011a9ec69a798f69b0264413a4b536b7063076 /synapse/storage/databases/main/pusher.py
parentImplementation of MSC3882 login token request (#13722) (diff)
downloadsynapse-ccca14140a019c2e0430f95d78fa075efd8d535f.tar.xz
Track device IDs for pushers (#13831)
Second half of the MSC3881 implementation
Diffstat (limited to 'synapse/storage/databases/main/pusher.py')
-rw-r--r--synapse/storage/databases/main/pusher.py73
1 files changed, 71 insertions, 2 deletions
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index ee55b8c4a9..01206950a9 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -124,7 +124,7 @@ class PusherWorkerStore(SQLBaseStore):
                 id, user_name, access_token, profile_tag, kind, app_id,
                 app_display_name, device_display_name, pushkey, ts, lang, data,
                 last_stream_ordering, last_success, failing_since,
-                COALESCE(enabled, TRUE) AS enabled
+                COALESCE(enabled, TRUE) AS enabled, device_id
             FROM pushers
             """
 
@@ -477,7 +477,74 @@ class PusherWorkerStore(SQLBaseStore):
         return number_deleted
 
 
-class PusherStore(PusherWorkerStore):
+class PusherBackgroundUpdatesStore(SQLBaseStore):
+    def __init__(
+        self,
+        database: DatabasePool,
+        db_conn: LoggingDatabaseConnection,
+        hs: "HomeServer",
+    ):
+        super().__init__(database, db_conn, hs)
+
+        self.db_pool.updates.register_background_update_handler(
+            "set_device_id_for_pushers", self._set_device_id_for_pushers
+        )
+
+    async def _set_device_id_for_pushers(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """Background update to populate the device_id column of the pushers table."""
+        last_pusher_id = progress.get("pusher_id", 0)
+
+        def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int:
+            txn.execute(
+                """
+                    SELECT p.id, at.device_id
+                    FROM pushers AS p
+                    INNER JOIN access_tokens AS at
+                        ON p.access_token = at.id
+                    WHERE
+                        p.access_token IS NOT NULL
+                        AND at.device_id IS NOT NULL
+                        AND p.id > ?
+                    ORDER BY p.id
+                    LIMIT ?
+                """,
+                (last_pusher_id, batch_size),
+            )
+
+            rows = self.db_pool.cursor_to_dict(txn)
+            if len(rows) == 0:
+                return 0
+
+            self.db_pool.simple_update_many_txn(
+                txn=txn,
+                table="pushers",
+                key_names=("id",),
+                key_values=[(row["id"],) for row in rows],
+                value_names=("device_id",),
+                value_values=[(row["device_id"],) for row in rows],
+            )
+
+            self.db_pool.updates._background_update_progress_txn(
+                txn, "set_device_id_for_pushers", {"pusher_id": rows[-1]["id"]}
+            )
+
+            return len(rows)
+
+        nb_processed = await self.db_pool.runInteraction(
+            "set_device_id_for_pushers", set_device_id_for_pushers_txn
+        )
+
+        if nb_processed < batch_size:
+            await self.db_pool.updates._end_background_update(
+                "set_device_id_for_pushers"
+            )
+
+        return nb_processed
+
+
+class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
     def get_pushers_stream_token(self) -> int:
         return self._pushers_id_gen.get_current_token()
 
@@ -496,6 +563,7 @@ class PusherStore(PusherWorkerStore):
         last_stream_ordering: int,
         profile_tag: str = "",
         enabled: bool = True,
+        device_id: Optional[str] = None,
     ) -> None:
         async with self._pushers_id_gen.get_next() as stream_id:
             # no need to lock because `pushers` has a unique key on
@@ -515,6 +583,7 @@ class PusherStore(PusherWorkerStore):
                     "profile_tag": profile_tag,
                     "id": stream_id,
                     "enabled": enabled,
+                    "device_id": device_id,
                 },
                 desc="add_pusher",
                 lock=False,