summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rwxr-xr-xsynapse/_scripts/synapse_port_db.py6
-rw-r--r--synapse/handlers/auth.py8
-rw-r--r--synapse/handlers/device.py2
-rw-r--r--synapse/handlers/register.py4
-rw-r--r--synapse/push/__init__.py7
-rw-r--r--synapse/push/pusherpool.py58
-rw-r--r--synapse/rest/admin/users.py1
-rw-r--r--synapse/rest/client/pusher.py1
-rw-r--r--synapse/storage/databases/main/pusher.py40
-rw-r--r--synapse/storage/schema/main/delta/74/02_set_device_id_for_pushers_bg_update.sql19
10 files changed, 113 insertions, 33 deletions
diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py
index 94b86c1d6f..1dcb397ba4 100755
--- a/synapse/_scripts/synapse_port_db.py
+++ b/synapse/_scripts/synapse_port_db.py
@@ -68,7 +68,10 @@ from synapse.storage.databases.main.media_repository import (
     MediaRepositoryBackgroundUpdateStore,
 )
 from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore
-from synapse.storage.databases.main.pusher import PusherWorkerStore
+from synapse.storage.databases.main.pusher import (
+    PusherBackgroundUpdatesStore,
+    PusherWorkerStore,
+)
 from synapse.storage.databases.main.receipts import ReceiptsBackgroundUpdateStore
 from synapse.storage.databases.main.registration import (
     RegistrationBackgroundUpdateStore,
@@ -226,6 +229,7 @@ class Store(
     AccountDataWorkerStore,
     PushRuleStore,
     PusherWorkerStore,
+    PusherBackgroundUpdatesStore,
     PresenceBackgroundUpdateStore,
     ReceiptsBackgroundUpdateStore,
     RelationsWorkerStore,
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 308e38edea..1e89447044 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -1504,8 +1504,10 @@ class AuthHandler:
         )
 
         # delete pushers associated with this access token
+        # XXX(quenting): This is only needed until the 'set_device_id_for_pushers'
+        # background update completes.
         if token.token_id is not None:
-            await self.hs.get_pusherpool().remove_pushers_by_access_token(
+            await self.hs.get_pusherpool().remove_pushers_by_access_tokens(
                 token.user_id, (token.token_id,)
             )
 
@@ -1535,7 +1537,9 @@ class AuthHandler:
             )
 
         # delete pushers associated with the access tokens
-        await self.hs.get_pusherpool().remove_pushers_by_access_token(
+        # XXX(quenting): This is only needed until the 'set_device_id_for_pushers'
+        # background update completes.
+        await self.hs.get_pusherpool().remove_pushers_by_access_tokens(
             user_id, (token_id for _, token_id, _ in tokens_and_devices)
         )
 
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 6f7963df43..9ded6389ac 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -503,6 +503,8 @@ class DeviceHandler(DeviceWorkerHandler):
             else:
                 raise
 
+        await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids)
+
         # Delete data specific to each device. Not optimised as it is not
         # considered as part of a critical path.
         for device_id in device_ids:
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 6b110dcb6e..c8bf2439af 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -1013,11 +1013,11 @@ class RegistrationHandler:
             user_tuple = await self.store.get_user_by_access_token(token)
             # The token better still exist.
             assert user_tuple
-            token_id = user_tuple.token_id
+            device_id = user_tuple.device_id
 
             await self.pusher_pool.add_or_update_pusher(
                 user_id=user_id,
-                access_token=token_id,
+                device_id=device_id,
                 kind="email",
                 app_id="m.email",
                 app_display_name="Email Notifications",
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index a0c760239d..9e3a98741a 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -103,7 +103,7 @@ class PusherConfig:
 
     id: Optional[str]
     user_name: str
-    access_token: Optional[int]
+
     profile_tag: str
     kind: str
     app_id: str
@@ -119,6 +119,11 @@ class PusherConfig:
     enabled: bool
     device_id: Optional[str]
 
+    # XXX(quenting): The access_token is not persisted anymore for new pushers, but we
+    # keep it when reading from the database, so that we don't get stale pushers
+    # while the "set_device_id_for_pushers" background update is running.
+    access_token: Optional[int]
+
     def as_dict(self) -> Dict[str, Any]:
         """Information that can be retrieved about a pusher after creation."""
         return {
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index e2648cbc93..6517e3566f 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -25,7 +25,7 @@ from synapse.metrics.background_process_metrics import (
 from synapse.push import Pusher, PusherConfig, PusherConfigException
 from synapse.push.pusher import PusherFactory
 from synapse.replication.http.push import ReplicationRemovePusherRestServlet
-from synapse.types import JsonDict, RoomStreamToken
+from synapse.types import JsonDict, RoomStreamToken, StrCollection
 from synapse.util.async_helpers import concurrently_execute
 from synapse.util.threepids import canonicalise_email
 
@@ -97,7 +97,6 @@ class PusherPool:
     async def add_or_update_pusher(
         self,
         user_id: str,
-        access_token: Optional[int],
         kind: str,
         app_id: str,
         app_display_name: str,
@@ -128,6 +127,22 @@ class PusherPool:
         # stream ordering, so it will process pushes from this point onwards.
         last_stream_ordering = self.store.get_room_max_stream_ordering()
 
+        # Before we actually persist the pusher, we check if the user already has one
+        # for this app ID and pushkey. If so, we want to keep the access token and
+        # device ID in place, since this could be one device modifying
+        # (e.g. enabling/disabling) another device's pusher.
+        # XXX(quenting): Even though we're not persisting the access_token_id for new
+        # pushers anymore, we still need to copy existing access_token_ids over when
+        # updating a pusher, in case the "set_device_id_for_pushers" background update
+        # hasn't run yet.
+        access_token_id = None
+        existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
+            user_id, app_id, pushkey
+        )
+        if existing_config:
+            device_id = existing_config.device_id
+            access_token_id = existing_config.access_token
+
         # we try to create the pusher just to validate the config: it
         # will then get pulled out of the database,
         # recreated, added and started: this means we have only one
@@ -136,7 +151,6 @@ class PusherPool:
             PusherConfig(
                 id=None,
                 user_name=user_id,
-                access_token=access_token,
                 profile_tag=profile_tag,
                 kind=kind,
                 app_id=app_id,
@@ -151,23 +165,12 @@ class PusherPool:
                 failing_since=None,
                 enabled=enabled,
                 device_id=device_id,
+                access_token=access_token_id,
             )
         )
 
-        # Before we actually persist the pusher, we check if the user already has one
-        # this app ID and pushkey. If so, we want to keep the access token and device ID
-        # in place, since this could be one device modifying (e.g. enabling/disabling)
-        # another device's pusher.
-        existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
-            user_id, app_id, pushkey
-        )
-        if existing_config:
-            access_token = existing_config.access_token
-            device_id = existing_config.device_id
-
         await self.store.add_pusher(
             user_id=user_id,
-            access_token=access_token,
             kind=kind,
             app_id=app_id,
             app_display_name=app_display_name,
@@ -180,6 +183,7 @@ class PusherPool:
             profile_tag=profile_tag,
             enabled=enabled,
             device_id=device_id,
+            access_token_id=access_token_id,
         )
         pusher = await self.process_pusher_change_by_id(app_id, pushkey, user_id)
 
@@ -199,7 +203,7 @@ class PusherPool:
                 )
                 await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
 
-    async def remove_pushers_by_access_token(
+    async def remove_pushers_by_access_tokens(
         self, user_id: str, access_tokens: Iterable[int]
     ) -> None:
         """Remove the pushers for a given user corresponding to a set of
@@ -209,6 +213,8 @@ class PusherPool:
             user_id: user to remove pushers for
             access_tokens: access token *ids* to remove pushers for
         """
+        # XXX(quenting): This is only needed until the "set_device_id_for_pushers"
+        # background update finishes
         tokens = set(access_tokens)
         for p in await self.store.get_pushers_by_user_id(user_id):
             if p.access_token in tokens:
@@ -220,6 +226,26 @@ class PusherPool:
                 )
                 await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
 
+    async def remove_pushers_by_devices(
+        self, user_id: str, devices: StrCollection
+    ) -> None:
+        """Remove the pushers for a given user corresponding to a set of devices
+
+        Args:
+            user_id: user to remove pushers for
+            devices: device IDs to remove pushers for
+        """
+        device_ids = set(devices)
+        for p in await self.store.get_pushers_by_user_id(user_id):
+            if p.device_id in device_ids:
+                logger.info(
+                    "Removing pusher for app id %s, pushkey %s, user %s",
+                    p.app_id,
+                    p.pushkey,
+                    p.user_name,
+                )
+                await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
+
     def on_new_notifications(self, max_token: RoomStreamToken) -> None:
         if not self.pushers:
             # nothing to do here.
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index 281e8fd0ad..331f225116 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -425,7 +425,6 @@ class UserRestServletV2(RestServlet):
                     ):
                         await self.pusher_pool.add_or_update_pusher(
                             user_id=user_id,
-                            access_token=None,
                             kind="email",
                             app_id="m.email",
                             app_display_name="Email Notifications",
diff --git a/synapse/rest/client/pusher.py b/synapse/rest/client/pusher.py
index 975eef2144..1a8f5292ac 100644
--- a/synapse/rest/client/pusher.py
+++ b/synapse/rest/client/pusher.py
@@ -126,7 +126,6 @@ class PushersSetRestServlet(RestServlet):
         try:
             await self.pusher_pool.add_or_update_pusher(
                 user_id=user.to_string(),
-                access_token=requester.access_token_id,
                 kind=content["kind"],
                 app_id=content["app_id"],
                 app_display_name=content["app_display_name"],
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 9a24f7a655..ab76b754e0 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -509,19 +509,24 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
     async def _set_device_id_for_pushers(
         self, progress: JsonDict, batch_size: int
     ) -> int:
-        """Background update to populate the device_id column of the pushers table."""
+        """
+        Background update to populate the device_id column and clear the access_token
+        column for the pushers table.
+        """
         last_pusher_id = progress.get("pusher_id", 0)
 
         def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int:
             txn.execute(
                 """
-                    SELECT p.id, at.device_id
+                    SELECT 
+                        p.id AS pusher_id,
+                        p.device_id AS pusher_device_id,
+                        at.device_id AS token_device_id
                     FROM pushers AS p
-                    INNER JOIN access_tokens AS at
+                    LEFT JOIN access_tokens AS at
                         ON p.access_token = at.id
                     WHERE
                         p.access_token IS NOT NULL
-                        AND at.device_id IS NOT NULL
                         AND p.id > ?
                     ORDER BY p.id
                     LIMIT ?
@@ -533,13 +538,27 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
             if len(rows) == 0:
                 return 0
 
+            # The reason we're clearing the access_token column here is a bit subtle.
+            # When a user logs out, we:
+            #  (1) delete the access token
+            #  (2) delete the device
+            #
+            # Ideally, we would delete the pushers only via its link to the device
+            # during (2), but since this background update might not have fully run yet,
+            # we're still deleting the pushers via the access token during (1).
             self.db_pool.simple_update_many_txn(
                 txn=txn,
                 table="pushers",
                 key_names=("id",),
-                key_values=[(row["id"],) for row in rows],
-                value_names=("device_id",),
-                value_values=[(row["device_id"],) for row in rows],
+                key_values=[(row["pusher_id"],) for row in rows],
+                value_names=("device_id", "access_token"),
+                # If there was already a device_id on the pusher, we only want to clear
+                # the access_token column, so we keep the existing device_id. Otherwise,
+                # we set the device_id we got from joining the access_tokens table.
+                value_values=[
+                    (row["pusher_device_id"] or row["token_device_id"], None)
+                    for row in rows
+                ],
             )
 
             self.db_pool.updates._background_update_progress_txn(
@@ -568,7 +587,6 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
     async def add_pusher(
         self,
         user_id: str,
-        access_token: Optional[int],
         kind: str,
         app_id: str,
         app_display_name: str,
@@ -581,13 +599,13 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
         profile_tag: str = "",
         enabled: bool = True,
         device_id: Optional[str] = None,
+        access_token_id: Optional[int] = None,
     ) -> None:
         async with self._pushers_id_gen.get_next() as stream_id:
             await self.db_pool.simple_upsert(
                 table="pushers",
                 keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
                 values={
-                    "access_token": access_token,
                     "kind": kind,
                     "app_display_name": app_display_name,
                     "device_display_name": device_display_name,
@@ -599,6 +617,10 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
                     "id": stream_id,
                     "enabled": enabled,
                     "device_id": device_id,
+                    # XXX(quenting): We're only really persisting the access token ID
+                    # when updating an existing pusher. This is in case the
+                    # 'set_device_id_for_pushers' background update hasn't finished yet.
+                    "access_token": access_token_id,
                 },
                 desc="add_pusher",
             )
diff --git a/synapse/storage/schema/main/delta/74/02_set_device_id_for_pushers_bg_update.sql b/synapse/storage/schema/main/delta/74/02_set_device_id_for_pushers_bg_update.sql
new file mode 100644
index 0000000000..1367fb6267
--- /dev/null
+++ b/synapse/storage/schema/main/delta/74/02_set_device_id_for_pushers_bg_update.sql
@@ -0,0 +1,19 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Triggers the background update to set the device_id for pushers
+-- that don't have one, and clear the access_token column.
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+    (7402, 'set_device_id_for_pushers', '{}');