diff options
Diffstat (limited to 'synapse/push')
-rw-r--r-- | synapse/push/__init__.py | 7 | ||||
-rw-r--r-- | synapse/push/pusherpool.py | 58 |
2 files changed, 48 insertions, 17 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index a0c760239d..9e3a98741a 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -103,7 +103,7 @@ class PusherConfig: id: Optional[str] user_name: str - access_token: Optional[int] + profile_tag: str kind: str app_id: str @@ -119,6 +119,11 @@ class PusherConfig: enabled: bool device_id: Optional[str] + # XXX(quenting): The access_token is not persisted anymore for new pushers, but we + # keep it when reading from the database, so that we don't get stale pushers + # while the "set_device_id_for_pushers" background update is running. + access_token: Optional[int] + def as_dict(self) -> Dict[str, Any]: """Information that can be retrieved about a pusher after creation.""" return { diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index e2648cbc93..6517e3566f 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -25,7 +25,7 @@ from synapse.metrics.background_process_metrics import ( from synapse.push import Pusher, PusherConfig, PusherConfigException from synapse.push.pusher import PusherFactory from synapse.replication.http.push import ReplicationRemovePusherRestServlet -from synapse.types import JsonDict, RoomStreamToken +from synapse.types import JsonDict, RoomStreamToken, StrCollection from synapse.util.async_helpers import concurrently_execute from synapse.util.threepids import canonicalise_email @@ -97,7 +97,6 @@ class PusherPool: async def add_or_update_pusher( self, user_id: str, - access_token: Optional[int], kind: str, app_id: str, app_display_name: str, @@ -128,6 +127,22 @@ class PusherPool: # stream ordering, so it will process pushes from this point onwards. last_stream_ordering = self.store.get_room_max_stream_ordering() + # Before we actually persist the pusher, we check if the user already has one + # for this app ID and pushkey. If so, we want to keep the access token and + # device ID in place, since this could be one device modifying + # (e.g. enabling/disabling) another device's pusher. + # XXX(quenting): Even though we're not persisting the access_token_id for new + # pushers anymore, we still need to copy existing access_token_ids over when + # updating a pusher, in case the "set_device_id_for_pushers" background update + # hasn't run yet. + access_token_id = None + existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey( + user_id, app_id, pushkey + ) + if existing_config: + device_id = existing_config.device_id + access_token_id = existing_config.access_token + # we try to create the pusher just to validate the config: it # will then get pulled out of the database, # recreated, added and started: this means we have only one @@ -136,7 +151,6 @@ class PusherPool: PusherConfig( id=None, user_name=user_id, - access_token=access_token, profile_tag=profile_tag, kind=kind, app_id=app_id, @@ -151,23 +165,12 @@ class PusherPool: failing_since=None, enabled=enabled, device_id=device_id, + access_token=access_token_id, ) ) - # Before we actually persist the pusher, we check if the user already has one - # this app ID and pushkey. If so, we want to keep the access token and device ID - # in place, since this could be one device modifying (e.g. enabling/disabling) - # another device's pusher. - existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey( - user_id, app_id, pushkey - ) - if existing_config: - access_token = existing_config.access_token - device_id = existing_config.device_id - await self.store.add_pusher( user_id=user_id, - access_token=access_token, kind=kind, app_id=app_id, app_display_name=app_display_name, @@ -180,6 +183,7 @@ class PusherPool: profile_tag=profile_tag, enabled=enabled, device_id=device_id, + access_token_id=access_token_id, ) pusher = await self.process_pusher_change_by_id(app_id, pushkey, user_id) @@ -199,7 +203,7 @@ class PusherPool: ) await self.remove_pusher(p.app_id, p.pushkey, p.user_name) - async def remove_pushers_by_access_token( + async def remove_pushers_by_access_tokens( self, user_id: str, access_tokens: Iterable[int] ) -> None: """Remove the pushers for a given user corresponding to a set of @@ -209,6 +213,8 @@ class PusherPool: user_id: user to remove pushers for access_tokens: access token *ids* to remove pushers for """ + # XXX(quenting): This is only needed until the "set_device_id_for_pushers" + # background update finishes tokens = set(access_tokens) for p in await self.store.get_pushers_by_user_id(user_id): if p.access_token in tokens: @@ -220,6 +226,26 @@ class PusherPool: ) await self.remove_pusher(p.app_id, p.pushkey, p.user_name) + async def remove_pushers_by_devices( + self, user_id: str, devices: StrCollection + ) -> None: + """Remove the pushers for a given user corresponding to a set of devices + + Args: + user_id: user to remove pushers for + devices: device IDs to remove pushers for + """ + device_ids = set(devices) + for p in await self.store.get_pushers_by_user_id(user_id): + if p.device_id in device_ids: + logger.info( + "Removing pusher for app id %s, pushkey %s, user %s", + p.app_id, + p.pushkey, + p.user_name, + ) + await self.remove_pusher(p.app_id, p.pushkey, p.user_name) + def on_new_notifications(self, max_token: RoomStreamToken) -> None: if not self.pushers: # nothing to do here. |