diff --git a/changelog.d/15280.misc b/changelog.d/15280.misc
new file mode 100644
index 0000000000..41d56b0cf0
--- /dev/null
+++ b/changelog.d/15280.misc
@@ -0,0 +1 @@
+Make the pushers rely on the `device_id` instead of the `access_token_id` for various operations.
diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py
index 94b86c1d6f..1dcb397ba4 100755
--- a/synapse/_scripts/synapse_port_db.py
+++ b/synapse/_scripts/synapse_port_db.py
@@ -68,7 +68,10 @@ from synapse.storage.databases.main.media_repository import (
MediaRepositoryBackgroundUpdateStore,
)
from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore
-from synapse.storage.databases.main.pusher import PusherWorkerStore
+from synapse.storage.databases.main.pusher import (
+ PusherBackgroundUpdatesStore,
+ PusherWorkerStore,
+)
from synapse.storage.databases.main.receipts import ReceiptsBackgroundUpdateStore
from synapse.storage.databases.main.registration import (
RegistrationBackgroundUpdateStore,
@@ -226,6 +229,7 @@ class Store(
AccountDataWorkerStore,
PushRuleStore,
PusherWorkerStore,
+ PusherBackgroundUpdatesStore,
PresenceBackgroundUpdateStore,
ReceiptsBackgroundUpdateStore,
RelationsWorkerStore,
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 308e38edea..1e89447044 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -1504,8 +1504,10 @@ class AuthHandler:
)
# delete pushers associated with this access token
+ # XXX(quenting): This is only needed until the 'set_device_id_for_pushers'
+ # background update completes.
if token.token_id is not None:
- await self.hs.get_pusherpool().remove_pushers_by_access_token(
+ await self.hs.get_pusherpool().remove_pushers_by_access_tokens(
token.user_id, (token.token_id,)
)
@@ -1535,7 +1537,9 @@ class AuthHandler:
)
# delete pushers associated with the access tokens
- await self.hs.get_pusherpool().remove_pushers_by_access_token(
+ # XXX(quenting): This is only needed until the 'set_device_id_for_pushers'
+ # background update completes.
+ await self.hs.get_pusherpool().remove_pushers_by_access_tokens(
user_id, (token_id for _, token_id, _ in tokens_and_devices)
)
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 6f7963df43..9ded6389ac 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -503,6 +503,8 @@ class DeviceHandler(DeviceWorkerHandler):
else:
raise
+ await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids)
+
# Delete data specific to each device. Not optimised as it is not
# considered as part of a critical path.
for device_id in device_ids:
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 6b110dcb6e..c8bf2439af 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -1013,11 +1013,11 @@ class RegistrationHandler:
user_tuple = await self.store.get_user_by_access_token(token)
# The token better still exist.
assert user_tuple
- token_id = user_tuple.token_id
+ device_id = user_tuple.device_id
await self.pusher_pool.add_or_update_pusher(
user_id=user_id,
- access_token=token_id,
+ device_id=device_id,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index a0c760239d..9e3a98741a 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -103,7 +103,7 @@ class PusherConfig:
id: Optional[str]
user_name: str
- access_token: Optional[int]
+
profile_tag: str
kind: str
app_id: str
@@ -119,6 +119,11 @@ class PusherConfig:
enabled: bool
device_id: Optional[str]
+ # XXX(quenting): The access_token is not persisted anymore for new pushers, but we
+ # keep it when reading from the database, so that we don't get stale pushers
+ # while the "set_device_id_for_pushers" background update is running.
+ access_token: Optional[int]
+
def as_dict(self) -> Dict[str, Any]:
"""Information that can be retrieved about a pusher after creation."""
return {
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index e2648cbc93..6517e3566f 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -25,7 +25,7 @@ from synapse.metrics.background_process_metrics import (
from synapse.push import Pusher, PusherConfig, PusherConfigException
from synapse.push.pusher import PusherFactory
from synapse.replication.http.push import ReplicationRemovePusherRestServlet
-from synapse.types import JsonDict, RoomStreamToken
+from synapse.types import JsonDict, RoomStreamToken, StrCollection
from synapse.util.async_helpers import concurrently_execute
from synapse.util.threepids import canonicalise_email
@@ -97,7 +97,6 @@ class PusherPool:
async def add_or_update_pusher(
self,
user_id: str,
- access_token: Optional[int],
kind: str,
app_id: str,
app_display_name: str,
@@ -128,6 +127,22 @@ class PusherPool:
# stream ordering, so it will process pushes from this point onwards.
last_stream_ordering = self.store.get_room_max_stream_ordering()
+ # Before we actually persist the pusher, we check if the user already has one
+ # for this app ID and pushkey. If so, we want to keep the access token and
+ # device ID in place, since this could be one device modifying
+ # (e.g. enabling/disabling) another device's pusher.
+ # XXX(quenting): Even though we're not persisting the access_token_id for new
+ # pushers anymore, we still need to copy existing access_token_ids over when
+ # updating a pusher, in case the "set_device_id_for_pushers" background update
+ # hasn't run yet.
+ access_token_id = None
+ existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
+ user_id, app_id, pushkey
+ )
+ if existing_config:
+ device_id = existing_config.device_id
+ access_token_id = existing_config.access_token
+
# we try to create the pusher just to validate the config: it
# will then get pulled out of the database,
# recreated, added and started: this means we have only one
@@ -136,7 +151,6 @@ class PusherPool:
PusherConfig(
id=None,
user_name=user_id,
- access_token=access_token,
profile_tag=profile_tag,
kind=kind,
app_id=app_id,
@@ -151,23 +165,12 @@ class PusherPool:
failing_since=None,
enabled=enabled,
device_id=device_id,
+ access_token=access_token_id,
)
)
- # Before we actually persist the pusher, we check if the user already has one
- # this app ID and pushkey. If so, we want to keep the access token and device ID
- # in place, since this could be one device modifying (e.g. enabling/disabling)
- # another device's pusher.
- existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
- user_id, app_id, pushkey
- )
- if existing_config:
- access_token = existing_config.access_token
- device_id = existing_config.device_id
-
await self.store.add_pusher(
user_id=user_id,
- access_token=access_token,
kind=kind,
app_id=app_id,
app_display_name=app_display_name,
@@ -180,6 +183,7 @@ class PusherPool:
profile_tag=profile_tag,
enabled=enabled,
device_id=device_id,
+ access_token_id=access_token_id,
)
pusher = await self.process_pusher_change_by_id(app_id, pushkey, user_id)
@@ -199,7 +203,7 @@ class PusherPool:
)
await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
- async def remove_pushers_by_access_token(
+ async def remove_pushers_by_access_tokens(
self, user_id: str, access_tokens: Iterable[int]
) -> None:
"""Remove the pushers for a given user corresponding to a set of
@@ -209,6 +213,8 @@ class PusherPool:
user_id: user to remove pushers for
access_tokens: access token *ids* to remove pushers for
"""
+ # XXX(quenting): This is only needed until the "set_device_id_for_pushers"
+ # background update finishes
tokens = set(access_tokens)
for p in await self.store.get_pushers_by_user_id(user_id):
if p.access_token in tokens:
@@ -220,6 +226,26 @@ class PusherPool:
)
await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
+ async def remove_pushers_by_devices(
+ self, user_id: str, devices: StrCollection
+ ) -> None:
+ """Remove the pushers for a given user corresponding to a set of devices
+
+ Args:
+ user_id: user to remove pushers for
+ devices: device IDs to remove pushers for
+ """
+ device_ids = set(devices)
+ for p in await self.store.get_pushers_by_user_id(user_id):
+ if p.device_id in device_ids:
+ logger.info(
+ "Removing pusher for app id %s, pushkey %s, user %s",
+ p.app_id,
+ p.pushkey,
+ p.user_name,
+ )
+ await self.remove_pusher(p.app_id, p.pushkey, p.user_name)
+
def on_new_notifications(self, max_token: RoomStreamToken) -> None:
if not self.pushers:
# nothing to do here.
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index 281e8fd0ad..331f225116 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -425,7 +425,6 @@ class UserRestServletV2(RestServlet):
):
await self.pusher_pool.add_or_update_pusher(
user_id=user_id,
- access_token=None,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",
diff --git a/synapse/rest/client/pusher.py b/synapse/rest/client/pusher.py
index 975eef2144..1a8f5292ac 100644
--- a/synapse/rest/client/pusher.py
+++ b/synapse/rest/client/pusher.py
@@ -126,7 +126,6 @@ class PushersSetRestServlet(RestServlet):
try:
await self.pusher_pool.add_or_update_pusher(
user_id=user.to_string(),
- access_token=requester.access_token_id,
kind=content["kind"],
app_id=content["app_id"],
app_display_name=content["app_display_name"],
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 9a24f7a655..ab76b754e0 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -509,19 +509,24 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
async def _set_device_id_for_pushers(
self, progress: JsonDict, batch_size: int
) -> int:
- """Background update to populate the device_id column of the pushers table."""
+ """
+ Background update to populate the device_id column and clear the access_token
+ column for the pushers table.
+ """
last_pusher_id = progress.get("pusher_id", 0)
def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int:
txn.execute(
"""
- SELECT p.id, at.device_id
+ SELECT
+ p.id AS pusher_id,
+ p.device_id AS pusher_device_id,
+ at.device_id AS token_device_id
FROM pushers AS p
- INNER JOIN access_tokens AS at
+ LEFT JOIN access_tokens AS at
ON p.access_token = at.id
WHERE
p.access_token IS NOT NULL
- AND at.device_id IS NOT NULL
AND p.id > ?
ORDER BY p.id
LIMIT ?
@@ -533,13 +538,27 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
if len(rows) == 0:
return 0
+ # The reason we're clearing the access_token column here is a bit subtle.
+ # When a user logs out, we:
+ # (1) delete the access token
+ # (2) delete the device
+ #
+ # Ideally, we would delete the pushers only via its link to the device
+ # during (2), but since this background update might not have fully run yet,
+ # we're still deleting the pushers via the access token during (1).
self.db_pool.simple_update_many_txn(
txn=txn,
table="pushers",
key_names=("id",),
- key_values=[(row["id"],) for row in rows],
- value_names=("device_id",),
- value_values=[(row["device_id"],) for row in rows],
+ key_values=[(row["pusher_id"],) for row in rows],
+ value_names=("device_id", "access_token"),
+ # If there was already a device_id on the pusher, we only want to clear
+ # the access_token column, so we keep the existing device_id. Otherwise,
+ # we set the device_id we got from joining the access_tokens table.
+ value_values=[
+ (row["pusher_device_id"] or row["token_device_id"], None)
+ for row in rows
+ ],
)
self.db_pool.updates._background_update_progress_txn(
@@ -568,7 +587,6 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
async def add_pusher(
self,
user_id: str,
- access_token: Optional[int],
kind: str,
app_id: str,
app_display_name: str,
@@ -581,13 +599,13 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
profile_tag: str = "",
enabled: bool = True,
device_id: Optional[str] = None,
+ access_token_id: Optional[int] = None,
) -> None:
async with self._pushers_id_gen.get_next() as stream_id:
await self.db_pool.simple_upsert(
table="pushers",
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
values={
- "access_token": access_token,
"kind": kind,
"app_display_name": app_display_name,
"device_display_name": device_display_name,
@@ -599,6 +617,10 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
"id": stream_id,
"enabled": enabled,
"device_id": device_id,
+ # XXX(quenting): We're only really persisting the access token ID
+ # when updating an existing pusher. This is in case the
+ # 'set_device_id_for_pushers' background update hasn't finished yet.
+ "access_token": access_token_id,
},
desc="add_pusher",
)
diff --git a/synapse/storage/schema/main/delta/74/02_set_device_id_for_pushers_bg_update.sql b/synapse/storage/schema/main/delta/74/02_set_device_id_for_pushers_bg_update.sql
new file mode 100644
index 0000000000..1367fb6267
--- /dev/null
+++ b/synapse/storage/schema/main/delta/74/02_set_device_id_for_pushers_bg_update.sql
@@ -0,0 +1,19 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Triggers the background update to set the device_id for pushers
+-- that don't have one, and clear the access_token column.
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (7402, 'set_device_id_for_pushers', '{}');
diff --git a/tests/push/test_email.py b/tests/push/test_email.py
index 4ea5472eb4..4b5c96aeae 100644
--- a/tests/push/test_email.py
+++ b/tests/push/test_email.py
@@ -105,7 +105,7 @@ class EmailPusherTests(HomeserverTestCase):
self.hs.get_datastores().main.get_user_by_access_token(self.access_token)
)
assert user_tuple is not None
- self.token_id = user_tuple.token_id
+ self.device_id = user_tuple.device_id
# We need to add email to account before we can create a pusher.
self.get_success(
@@ -117,7 +117,7 @@ class EmailPusherTests(HomeserverTestCase):
pusher = self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=self.user_id,
- access_token=self.token_id,
+ device_id=self.device_id,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",
@@ -141,7 +141,7 @@ class EmailPusherTests(HomeserverTestCase):
self.get_success_or_raise(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=self.user_id,
- access_token=self.token_id,
+ device_id=self.device_id,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",
diff --git a/tests/push/test_http.py b/tests/push/test_http.py
index c280ddcdf6..99cec0836b 100644
--- a/tests/push/test_http.py
+++ b/tests/push/test_http.py
@@ -67,13 +67,13 @@ class HTTPPusherTests(HomeserverTestCase):
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
- token_id = user_tuple.token_id
+ device_id = user_tuple.device_id
def test_data(data: Any) -> None:
self.get_failure(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
- access_token=token_id,
+ device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
@@ -114,12 +114,12 @@ class HTTPPusherTests(HomeserverTestCase):
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
- token_id = user_tuple.token_id
+ device_id = user_tuple.device_id
self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
- access_token=token_id,
+ device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
@@ -235,12 +235,12 @@ class HTTPPusherTests(HomeserverTestCase):
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
- token_id = user_tuple.token_id
+ device_id = user_tuple.device_id
self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
- access_token=token_id,
+ device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
@@ -356,12 +356,12 @@ class HTTPPusherTests(HomeserverTestCase):
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
- token_id = user_tuple.token_id
+ device_id = user_tuple.device_id
self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
- access_token=token_id,
+ device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
@@ -443,12 +443,12 @@ class HTTPPusherTests(HomeserverTestCase):
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
- token_id = user_tuple.token_id
+ device_id = user_tuple.device_id
self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
- access_token=token_id,
+ device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
@@ -521,12 +521,12 @@ class HTTPPusherTests(HomeserverTestCase):
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
- token_id = user_tuple.token_id
+ device_id = user_tuple.device_id
self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
- access_token=token_id,
+ device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
@@ -628,12 +628,12 @@ class HTTPPusherTests(HomeserverTestCase):
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
- token_id = user_tuple.token_id
+ device_id = user_tuple.device_id
self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
- access_token=token_id,
+ device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
@@ -764,12 +764,12 @@ class HTTPPusherTests(HomeserverTestCase):
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
- token_id = user_tuple.token_id
+ device_id = user_tuple.device_id
self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
- access_token=token_id,
+ device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
@@ -778,7 +778,6 @@ class HTTPPusherTests(HomeserverTestCase):
lang=None,
data={"url": "http://example.com/_matrix/push/v1/notify"},
enabled=enabled,
- device_id=user_tuple.device_id,
)
)
@@ -895,19 +894,17 @@ class HTTPPusherTests(HomeserverTestCase):
def test_update_different_device_access_token_device_id(self) -> None:
"""Tests that if we create a pusher from one device, the update it from another
- device, the access token and device ID associated with the pusher stays the
- same.
+ device, the device ID associated with the pusher stays the same.
"""
# Create a user with a pusher.
user_id, access_token = self._make_user_with_pusher("user")
- # Get the token ID for the current access token, since that's what we store in
- # the pushers table. Also get the device ID from it.
+ # Get the device ID for the current access token, since that's what we store in
+ # the pushers table.
user_tuple = self.get_success(
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
- token_id = user_tuple.token_id
device_id = user_tuple.device_id
# Generate a new access token, and update the pusher with it.
@@ -920,10 +917,9 @@ class HTTPPusherTests(HomeserverTestCase):
)
pushers: List[PusherConfig] = list(ret)
- # Check that we still have one pusher, and that the access token and device ID
- # associated with it didn't change.
+ # Check that we still have one pusher, and that the device ID associated with
+ # it didn't change.
self.assertEqual(len(pushers), 1)
- self.assertEqual(pushers[0].access_token, token_id)
self.assertEqual(pushers[0].device_id, device_id)
@override_config({"experimental_features": {"msc3881_enabled": True}})
diff --git a/tests/replication/test_pusher_shard.py b/tests/replication/test_pusher_shard.py
index 0798b021c3..dcb3e6669b 100644
--- a/tests/replication/test_pusher_shard.py
+++ b/tests/replication/test_pusher_shard.py
@@ -51,12 +51,12 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase):
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_dict is not None
- token_id = user_dict.token_id
+ device_id = user_dict.device_id
self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
- access_token=token_id,
+ device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py
index 4b8f889a71..b4241ceaf0 100644
--- a/tests/rest/admin/test_user.py
+++ b/tests/rest/admin/test_user.py
@@ -3047,12 +3047,12 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
self.store.get_user_by_access_token(other_user_token)
)
assert user_tuple is not None
- token_id = user_tuple.token_id
+ device_id = user_tuple.device_id
self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=self.other_user,
- access_token=token_id,
+ device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
|