diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index e2648cbc93..6517e3566f 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -25,7 +25,7 @@ from synapse.metrics.background_process_metrics import (
from synapse.push import Pusher, PusherConfig, PusherConfigException
from synapse.push.pusher import PusherFactory
from synapse.replication.http.push import ReplicationRemovePusherRestServlet
-from synapse.types import JsonDict, RoomStreamToken
+from synapse.types import JsonDict, RoomStreamToken, StrCollection
from synapse.util.async_helpers import concurrently_execute
from synapse.util.threepids import canonicalise_email
@@ -97,7 +97,6 @@ class PusherPool:
async def add_or_update_pusher(
self,
user_id: str,
- access_token: Optional[int],
kind: str,
app_id: str,
app_display_name: str,
@@ -128,6 +127,22 @@ class PusherPool:
# stream ordering, so it will process pushes from this point onwards.
last_stream_ordering = self.store.get_room_max_stream_ordering()
+ # Before we actually persist the pusher, we check if the user already has one
+ # for this app ID and pushkey. If so, we want to keep the access token and
+ # device ID in place, since this could be one device modifying
+ # (e.g. enabling/disabling) another device's pusher.
+ # XXX(quenting): Even though we're not persisting the access_token_id for new
+ # pushers anymore, we still need to copy existing access_token_ids over when
+ # updating a pusher, in case the "set_device_id_for_pushers" background update
+ # hasn't run yet.
+ access_token_id = None
+ existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
+ user_id, app_id, pushkey
+ )
+ if existing_config:
+ device_id = existing_config.device_id
+ access_token_id = existing_config.access_token
+
# we try to create the pusher just to validate the config: it
# will then get pulled out of the database,
# recreated, added and started: this means we have only one
@@ -136,7 +151,6 @@ class PusherPool:
PusherConfig(
id=None,
user_name=user_id,
- access_token=access_token,
profile_tag=profile_tag,
kind=kind,
app_id=app_id,
@@ -151,23 +165,12 @@ class PusherPool:
failing_since=None,
enabled=enabled,
device_id=device_id,
+ access_token=access_token_id,
)
)
- # Before we actually persist the pusher, we check if the user already has one
- # this app ID and pushkey. If so, we want to keep the access token and device ID
- # in place, since this could be one device modifying (e.g. enabling/disabling)
- # another device's pusher.
- existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
- user_id, app_id, pushkey
- )
- if existing_config:
- access_token = existing_config.access_token
- device_id = existing_config.device_id
-
await self.store.add_pusher(
user_id=user_id,
- access_token=access_token,
kind=kind,
app_id=app_id,
app_display_name=app_display_name,
@@ -180,6 +183,7 @@ class PusherPool:
profile_tag=profile_tag,
enabled=enabled,
device_id=device_id,
+ access_token_id=access_token_id,
)
pusher = await self.process_pusher_change_by_id(app_id, pushkey, user_id)
@@ -199,7 +203,7 @@ class PusherPool:
)
await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
- async def remove_pushers_by_access_token(
+ async def remove_pushers_by_access_tokens(
self, user_id: str, access_tokens: Iterable[int]
) -> None:
"""Remove the pushers for a given user corresponding to a set of
@@ -209,6 +213,8 @@ class PusherPool:
user_id: user to remove pushers for
access_tokens: access token *ids* to remove pushers for
"""
+ # XXX(quenting): This is only needed until the "set_device_id_for_pushers"
+ # background update finishes
tokens = set(access_tokens)
for p in await self.store.get_pushers_by_user_id(user_id):
if p.access_token in tokens:
@@ -220,6 +226,26 @@ class PusherPool:
)
await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
+ async def remove_pushers_by_devices(
+ self, user_id: str, devices: StrCollection
+ ) -> None:
+ """Remove the pushers for a given user corresponding to a set of devices
+
+ Args:
+ user_id: user to remove pushers for
+ devices: device IDs to remove pushers for
+ """
+ device_ids = set(devices)
+ for p in await self.store.get_pushers_by_user_id(user_id):
+ if p.device_id in device_ids:
+ logger.info(
+ "Removing pusher for app id %s, pushkey %s, user %s",
+ p.app_id,
+ p.pushkey,
+ p.user_name,
+ )
+ await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
+
def on_new_notifications(self, max_token: RoomStreamToken) -> None:
if not self.pushers:
# nothing to do here.
|