summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/__init__.py7
-rw-r--r--synapse/push/pusherpool.py58
2 files changed, 48 insertions, 17 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index a0c760239d..9e3a98741a 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -103,7 +103,7 @@ class PusherConfig:
 
     id: Optional[str]
     user_name: str
-    access_token: Optional[int]
+
     profile_tag: str
     kind: str
     app_id: str
@@ -119,6 +119,11 @@ class PusherConfig:
     enabled: bool
     device_id: Optional[str]
 
+    # XXX(quenting): The access_token is not persisted anymore for new pushers, but we
+    # keep it when reading from the database, so that we don't get stale pushers
+    # while the "set_device_id_for_pushers" background update is running.
+    access_token: Optional[int]
+
     def as_dict(self) -> Dict[str, Any]:
         """Information that can be retrieved about a pusher after creation."""
         return {
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index e2648cbc93..6517e3566f 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -25,7 +25,7 @@ from synapse.metrics.background_process_metrics import (
 from synapse.push import Pusher, PusherConfig, PusherConfigException
 from synapse.push.pusher import PusherFactory
 from synapse.replication.http.push import ReplicationRemovePusherRestServlet
-from synapse.types import JsonDict, RoomStreamToken
+from synapse.types import JsonDict, RoomStreamToken, StrCollection
 from synapse.util.async_helpers import concurrently_execute
 from synapse.util.threepids import canonicalise_email
 
@@ -97,7 +97,6 @@ class PusherPool:
     async def add_or_update_pusher(
         self,
         user_id: str,
-        access_token: Optional[int],
         kind: str,
         app_id: str,
         app_display_name: str,
@@ -128,6 +127,22 @@ class PusherPool:
         # stream ordering, so it will process pushes from this point onwards.
         last_stream_ordering = self.store.get_room_max_stream_ordering()
 
+        # Before we actually persist the pusher, we check if the user already has one
+        # for this app ID and pushkey. If so, we want to keep the access token and
+        # device ID in place, since this could be one device modifying
+        # (e.g. enabling/disabling) another device's pusher.
+        # XXX(quenting): Even though we're not persisting the access_token_id for new
+        # pushers anymore, we still need to copy existing access_token_ids over when
+        # updating a pusher, in case the "set_device_id_for_pushers" background update
+        # hasn't run yet.
+        access_token_id = None
+        existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
+            user_id, app_id, pushkey
+        )
+        if existing_config:
+            device_id = existing_config.device_id
+            access_token_id = existing_config.access_token
+
         # we try to create the pusher just to validate the config: it
         # will then get pulled out of the database,
         # recreated, added and started: this means we have only one
@@ -136,7 +151,6 @@ class PusherPool:
             PusherConfig(
                 id=None,
                 user_name=user_id,
-                access_token=access_token,
                 profile_tag=profile_tag,
                 kind=kind,
                 app_id=app_id,
@@ -151,23 +165,12 @@ class PusherPool:
                 failing_since=None,
                 enabled=enabled,
                 device_id=device_id,
+                access_token=access_token_id,
             )
         )
 
-        # Before we actually persist the pusher, we check if the user already has one
-        # this app ID and pushkey. If so, we want to keep the access token and device ID
-        # in place, since this could be one device modifying (e.g. enabling/disabling)
-        # another device's pusher.
-        existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
-            user_id, app_id, pushkey
-        )
-        if existing_config:
-            access_token = existing_config.access_token
-            device_id = existing_config.device_id
-
         await self.store.add_pusher(
             user_id=user_id,
-            access_token=access_token,
             kind=kind,
             app_id=app_id,
             app_display_name=app_display_name,
@@ -180,6 +183,7 @@ class PusherPool:
             profile_tag=profile_tag,
             enabled=enabled,
             device_id=device_id,
+            access_token_id=access_token_id,
         )
         pusher = await self.process_pusher_change_by_id(app_id, pushkey, user_id)
 
@@ -199,7 +203,7 @@ class PusherPool:
                 )
                 await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
 
-    async def remove_pushers_by_access_token(
+    async def remove_pushers_by_access_tokens(
         self, user_id: str, access_tokens: Iterable[int]
     ) -> None:
         """Remove the pushers for a given user corresponding to a set of
@@ -209,6 +213,8 @@ class PusherPool:
             user_id: user to remove pushers for
             access_tokens: access token *ids* to remove pushers for
         """
+        # XXX(quenting): This is only needed until the "set_device_id_for_pushers"
+        # background update finishes
         tokens = set(access_tokens)
         for p in await self.store.get_pushers_by_user_id(user_id):
             if p.access_token in tokens:
@@ -220,6 +226,26 @@ class PusherPool:
                 )
                 await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
 
+    async def remove_pushers_by_devices(
+        self, user_id: str, devices: StrCollection
+    ) -> None:
+        """Remove the pushers for a given user corresponding to a set of devices
+
+        Args:
+            user_id: user to remove pushers for
+            devices: device IDs to remove pushers for
+        """
+        device_ids = set(devices)
+        for p in await self.store.get_pushers_by_user_id(user_id):
+            if p.device_id in device_ids:
+                logger.info(
+                    "Removing pusher for app id %s, pushkey %s, user %s",
+                    p.app_id,
+                    p.pushkey,
+                    p.user_name,
+                )
+                await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
+
     def on_new_notifications(self, max_token: RoomStreamToken) -> None:
         if not self.pushers:
             # nothing to do here.