summary refs log tree commit diff
path: root/synapse/push/pusherpool.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/push/pusherpool.py')
-rw-r--r--synapse/push/pusherpool.py58
1 files changed, 42 insertions, 16 deletions
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index e2648cbc93..6517e3566f 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -25,7 +25,7 @@ from synapse.metrics.background_process_metrics import (
 from synapse.push import Pusher, PusherConfig, PusherConfigException
 from synapse.push.pusher import PusherFactory
 from synapse.replication.http.push import ReplicationRemovePusherRestServlet
-from synapse.types import JsonDict, RoomStreamToken
+from synapse.types import JsonDict, RoomStreamToken, StrCollection
 from synapse.util.async_helpers import concurrently_execute
 from synapse.util.threepids import canonicalise_email
 
@@ -97,7 +97,6 @@ class PusherPool:
     async def add_or_update_pusher(
         self,
         user_id: str,
-        access_token: Optional[int],
         kind: str,
         app_id: str,
         app_display_name: str,
@@ -128,6 +127,22 @@ class PusherPool:
         # stream ordering, so it will process pushes from this point onwards.
         last_stream_ordering = self.store.get_room_max_stream_ordering()
 
+        # Before we actually persist the pusher, we check if the user already has one
+        # for this app ID and pushkey. If so, we want to keep the access token and
+        # device ID in place, since this could be one device modifying
+        # (e.g. enabling/disabling) another device's pusher.
+        # XXX(quenting): Even though we're not persisting the access_token_id for new
+        # pushers anymore, we still need to copy existing access_token_ids over when
+        # updating a pusher, in case the "set_device_id_for_pushers" background update
+        # hasn't run yet.
+        access_token_id = None
+        existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
+            user_id, app_id, pushkey
+        )
+        if existing_config:
+            device_id = existing_config.device_id
+            access_token_id = existing_config.access_token
+
         # we try to create the pusher just to validate the config: it
         # will then get pulled out of the database,
         # recreated, added and started: this means we have only one
@@ -136,7 +151,6 @@ class PusherPool:
             PusherConfig(
                 id=None,
                 user_name=user_id,
-                access_token=access_token,
                 profile_tag=profile_tag,
                 kind=kind,
                 app_id=app_id,
@@ -151,23 +165,12 @@ class PusherPool:
                 failing_since=None,
                 enabled=enabled,
                 device_id=device_id,
+                access_token=access_token_id,
             )
         )
 
-        # Before we actually persist the pusher, we check if the user already has one
-        # this app ID and pushkey. If so, we want to keep the access token and device ID
-        # in place, since this could be one device modifying (e.g. enabling/disabling)
-        # another device's pusher.
-        existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
-            user_id, app_id, pushkey
-        )
-        if existing_config:
-            access_token = existing_config.access_token
-            device_id = existing_config.device_id
-
         await self.store.add_pusher(
             user_id=user_id,
-            access_token=access_token,
             kind=kind,
             app_id=app_id,
             app_display_name=app_display_name,
@@ -180,6 +183,7 @@ class PusherPool:
             profile_tag=profile_tag,
             enabled=enabled,
             device_id=device_id,
+            access_token_id=access_token_id,
         )
         pusher = await self.process_pusher_change_by_id(app_id, pushkey, user_id)
 
@@ -199,7 +203,7 @@ class PusherPool:
                 )
                 await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
 
-    async def remove_pushers_by_access_token(
+    async def remove_pushers_by_access_tokens(
         self, user_id: str, access_tokens: Iterable[int]
     ) -> None:
         """Remove the pushers for a given user corresponding to a set of
@@ -209,6 +213,8 @@ class PusherPool:
             user_id: user to remove pushers for
             access_tokens: access token *ids* to remove pushers for
         """
+        # XXX(quenting): This is only needed until the "set_device_id_for_pushers"
+        # background update finishes
         tokens = set(access_tokens)
         for p in await self.store.get_pushers_by_user_id(user_id):
             if p.access_token in tokens:
@@ -220,6 +226,26 @@ class PusherPool:
                 )
                 await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
 
+    async def remove_pushers_by_devices(
+        self, user_id: str, devices: StrCollection
+    ) -> None:
+        """Remove the pushers for a given user corresponding to a set of devices
+
+        Args:
+            user_id: user to remove pushers for
+            devices: device IDs to remove pushers for
+        """
+        device_ids = set(devices)
+        for p in await self.store.get_pushers_by_user_id(user_id):
+            if p.device_id in device_ids:
+                logger.info(
+                    "Removing pusher for app id %s, pushkey %s, user %s",
+                    p.app_id,
+                    p.pushkey,
+                    p.user_name,
+                )
+                await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
+
     def on_new_notifications(self, max_token: RoomStreamToken) -> None:
         if not self.pushers:
             # nothing to do here.