diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 5503621ad6..7647cda2c6 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1599,6 +1599,73 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
return rows
+ async def check_too_many_devices_for_user(self, user_id: str) -> List[str]:
+ """Check if the user has a lot of devices, and if so return the set of
+ devices we can prune.
+
+ This does *not* return hidden devices or devices with E2E keys.
+ """
+
+ num_devices = await self.db_pool.simple_select_one_onecol(
+ table="devices",
+ keyvalues={"user_id": user_id, "hidden": False},
+ retcol="COALESCE(COUNT(*), 0)",
+ desc="count_devices",
+ )
+
+ # We let users have up to ten devices without pruning.
+ if num_devices <= 10:
+ return []
+
+ # We always prune devices not seen in the last 14 days...
+ max_last_seen = self._clock.time_msec() - 14 * 24 * 60 * 60 * 1000
+
+ # ... but we also cap the maximum number of devices the user can have to
+ # 50.
+ if num_devices > 50:
+ # Choose a last seen that ensures we keep at most 50 devices.
+ sql = """
+ SELECT last_seen FROM devices
+ LEFT JOIN e2e_device_keys_json USING (user_id, device_id)
+ WHERE
+ user_id = ?
+ AND NOT hidden
+ AND last_seen IS NOT NULL
+ AND key_json IS NULL
+ ORDER BY last_seen DESC
+ LIMIT 1
+ OFFSET 50
+ """
+
+ rows = await self.db_pool.execute(
+ "check_too_many_devices_for_user_last_seen", None, sql, (user_id,)
+ )
+ if rows:
+ max_last_seen = max(rows[0][0], max_last_seen)
+
+ # Fetch the devices to delete.
+ sql = """
+ SELECT DISTINCT device_id FROM devices
+ LEFT JOIN e2e_device_keys_json USING (user_id, device_id)
+ WHERE
+ user_id = ?
+ AND NOT hidden
+ AND last_seen < ?
+ AND key_json IS NULL
+ ORDER BY last_seen
+ """
+
+ def check_too_many_devices_for_user_txn(
+ txn: LoggingTransaction,
+ ) -> List[str]:
+ txn.execute(sql, (user_id, max_last_seen))
+ return [device_id for device_id, in txn]
+
+ return await self.db_pool.runInteraction(
+ "check_too_many_devices_for_user",
+ check_too_many_devices_for_user_txn,
+ )
+
class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
# Because we have write access, this will be a StreamIdGenerator
@@ -1657,6 +1724,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
values={},
insertion_values={
"display_name": initial_device_display_name,
+ "last_seen": self._clock.time_msec(),
"hidden": False,
},
desc="store_device",
@@ -1702,7 +1770,15 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
)
raise StoreError(500, "Problem storing device.")
- async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:
+ @cached(max_entries=0)
+ async def delete_device(self, user_id: str, device_id: str) -> None:
+ raise NotImplementedError()
+
+ # Note: sometimes deleting rows out of `device_inbox` can take a long time,
+ # so we use a cache so that we deduplicate in flight requests to delete
+ # devices.
+ @cachedList(cached_method_name="delete_device", list_name="device_ids")
+ async def delete_devices(self, user_id: str, device_ids: Collection[str]) -> dict:
"""Deletes several devices.
Args:
@@ -1739,6 +1815,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
for device_id in device_ids:
self.device_id_exists_cache.invalidate((user_id, device_id))
+ return {}
+
async def update_device(
self, user_id: str, device_id: str, new_display_name: Optional[str] = None
) -> None:
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index a3b6c8ae8e..dc7768c50c 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -51,7 +51,7 @@ from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import JsonDict
-from synapse.util import json_encoder
+from synapse.util import json_decoder, json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
@@ -1028,14 +1028,17 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
async def claim_e2e_one_time_keys(
self, query_list: Iterable[Tuple[str, str, str]]
- ) -> Dict[str, Dict[str, Dict[str, str]]]:
+ ) -> Tuple[Dict[str, Dict[str, Dict[str, JsonDict]]], List[Tuple[str, str, str]]]:
"""Take a list of one time keys out of the database.
Args:
query_list: An iterable of tuples of (user ID, device ID, algorithm).
Returns:
- A map of user ID -> a map device ID -> a map of key ID -> JSON bytes.
+ A tuple pf:
+ A map of user ID -> a map device ID -> a map of key ID -> JSON.
+
+ A copy of the input which has not been fulfilled.
"""
@trace
@@ -1115,7 +1118,8 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
key_id, key_json = otk_row
return f"{algorithm}:{key_id}", key_json
- results: Dict[str, Dict[str, Dict[str, str]]] = {}
+ results: Dict[str, Dict[str, Dict[str, JsonDict]]] = {}
+ missing: List[Tuple[str, str, str]] = []
for user_id, device_id, algorithm in query_list:
if self.database_engine.supports_returning:
# If we support RETURNING clause we can use a single query that
@@ -1138,11 +1142,25 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
device_results = results.setdefault(user_id, {}).setdefault(
device_id, {}
)
- device_results[claim_row[0]] = claim_row[1]
- continue
+ device_results[claim_row[0]] = json_decoder.decode(claim_row[1])
+ else:
+ missing.append((user_id, device_id, algorithm))
+
+ return results, missing
+
+ async def claim_e2e_fallback_keys(
+ self, query_list: Iterable[Tuple[str, str, str]]
+ ) -> Dict[str, Dict[str, Dict[str, JsonDict]]]:
+ """Take a list of fallback keys out of the database.
- # No one-time key available, so see if there's a fallback
- # key
+ Args:
+ query_list: An iterable of tuples of (user ID, device ID, algorithm).
+
+ Returns:
+ A map of user ID -> a map device ID -> a map of key ID -> JSON.
+ """
+ results: Dict[str, Dict[str, Dict[str, JsonDict]]] = {}
+ for user_id, device_id, algorithm in query_list:
row = await self.db_pool.simple_select_one(
table="e2e_fallback_keys_json",
keyvalues={
@@ -1179,7 +1197,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
)
device_results = results.setdefault(user_id, {}).setdefault(device_id, {})
- device_results[f"{algorithm}:{key_id}"] = key_json
+ device_results[f"{algorithm}:{key_id}"] = json_decoder.decode(key_json)
return results
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index a8a4ed4436..ccd9f9d141 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -27,6 +27,7 @@ from typing import (
Optional,
Set,
Tuple,
+ cast,
)
import attr
@@ -1126,11 +1127,15 @@ class PersistEventsStore:
# been inserted into room_memberships.
txn.execute_batch(
"""INSERT INTO current_state_events
- (room_id, type, state_key, event_id, membership)
- VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
+ (room_id, type, state_key, event_id, membership, event_stream_ordering)
+ VALUES (
+ ?, ?, ?, ?,
+ (SELECT membership FROM room_memberships WHERE event_id = ?),
+ (SELECT stream_ordering FROM events WHERE event_id = ?)
+ )
""",
[
- (room_id, key[0], key[1], ev_id, ev_id)
+ (room_id, key[0], key[1], ev_id, ev_id, ev_id)
for key, ev_id in to_insert.items()
],
)
@@ -1157,11 +1162,15 @@ class PersistEventsStore:
if to_insert:
txn.execute_batch(
"""INSERT INTO local_current_membership
- (room_id, user_id, event_id, membership)
- VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
+ (room_id, user_id, event_id, membership, event_stream_ordering)
+ VALUES (
+ ?, ?, ?,
+ (SELECT membership FROM room_memberships WHERE event_id = ?),
+ (SELECT stream_ordering FROM events WHERE event_id = ?)
+ )
""",
[
- (room_id, key[1], ev_id, ev_id)
+ (room_id, key[1], ev_id, ev_id, ev_id)
for key, ev_id in to_insert.items()
if key[0] == EventTypes.Member and self.is_mine_id(key[1])
],
@@ -1340,9 +1349,7 @@ class PersistEventsStore:
[event.event_id for event, _ in events_and_contexts],
)
- have_persisted: Dict[str, bool] = {
- event_id: outlier for event_id, outlier in txn
- }
+ have_persisted = dict(cast(Iterable[Tuple[str, bool]], txn))
logger.debug(
"_update_outliers_txn: events=%s have_persisted=%s",
@@ -1769,6 +1776,7 @@ class PersistEventsStore:
table="room_memberships",
keys=(
"event_id",
+ "event_stream_ordering",
"user_id",
"sender",
"room_id",
@@ -1779,6 +1787,7 @@ class PersistEventsStore:
values=[
(
event.event_id,
+ event.internal_metadata.stream_ordering,
event.state_key,
event.user_id,
event.room_id,
@@ -1811,6 +1820,7 @@ class PersistEventsStore:
keyvalues={"room_id": event.room_id, "user_id": event.state_key},
values={
"event_id": event.event_id,
+ "event_stream_ordering": event.internal_metadata.stream_ordering,
"membership": event.membership,
},
)
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 7a7c0d9c75..efbd3e75d9 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -428,14 +428,16 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
"partial_state_events",
"partial_state_rooms_servers",
"partial_state_rooms",
+ # Note: the _membership(s) tables have foreign keys to the `events` table
+ # so must be deleted first.
+ "local_current_membership",
+ "room_memberships",
"events",
"federation_inbound_events_staging",
- "local_current_membership",
"receipts_graph",
"receipts_linearized",
"room_aliases",
"room_depth",
- "room_memberships",
"room_stats_state",
"room_stats_current",
"room_stats_earliest_token",
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 9a24f7a655..aeb6034f46 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -509,19 +509,24 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
async def _set_device_id_for_pushers(
self, progress: JsonDict, batch_size: int
) -> int:
- """Background update to populate the device_id column of the pushers table."""
+ """
+ Background update to populate the device_id column and clear the access_token
+ column for the pushers table.
+ """
last_pusher_id = progress.get("pusher_id", 0)
def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int:
txn.execute(
"""
- SELECT p.id, at.device_id
+ SELECT
+ p.id AS pusher_id,
+ p.device_id AS pusher_device_id,
+ at.device_id AS token_device_id
FROM pushers AS p
- INNER JOIN access_tokens AS at
+ LEFT JOIN access_tokens AS at
ON p.access_token = at.id
WHERE
p.access_token IS NOT NULL
- AND at.device_id IS NOT NULL
AND p.id > ?
ORDER BY p.id
LIMIT ?
@@ -533,13 +538,27 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
if len(rows) == 0:
return 0
+ # The reason we're clearing the access_token column here is a bit subtle.
+ # When a user logs out, we:
+ # (1) delete the access token
+ # (2) delete the device
+ #
+ # Ideally, we would delete the pushers only via its link to the device
+ # during (2), but since this background update might not have fully run yet,
+ # we're still deleting the pushers via the access token during (1).
self.db_pool.simple_update_many_txn(
txn=txn,
table="pushers",
key_names=("id",),
- key_values=[(row["id"],) for row in rows],
- value_names=("device_id",),
- value_values=[(row["device_id"],) for row in rows],
+ key_values=[(row["pusher_id"],) for row in rows],
+ value_names=("device_id", "access_token"),
+ # If there was already a device_id on the pusher, we only want to clear
+ # the access_token column, so we keep the existing device_id. Otherwise,
+ # we set the device_id we got from joining the access_tokens table.
+ value_values=[
+ (row["pusher_device_id"] or row["token_device_id"], None)
+ for row in rows
+ ],
)
self.db_pool.updates._background_update_progress_txn(
@@ -568,7 +587,6 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
async def add_pusher(
self,
user_id: str,
- access_token: Optional[int],
kind: str,
app_id: str,
app_display_name: str,
@@ -581,13 +599,13 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
profile_tag: str = "",
enabled: bool = True,
device_id: Optional[str] = None,
+ access_token_id: Optional[int] = None,
) -> None:
async with self._pushers_id_gen.get_next() as stream_id:
await self.db_pool.simple_upsert(
table="pushers",
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
values={
- "access_token": access_token,
"kind": kind,
"app_display_name": app_display_name,
"device_display_name": device_display_name,
@@ -599,6 +617,10 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
"id": stream_id,
"enabled": enabled,
"device_id": device_id,
+ # XXX(quenting): We're only really persisting the access token ID
+ # when updating an existing pusher. This is in case the
+ # 'set_device_id_for_pushers' background update hasn't finished yet.
+ "access_token": access_token_id,
},
desc="add_pusher",
)
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index bc3a83919c..3955a8a9a5 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -472,12 +472,11 @@ class RelationsWorkerStore(SQLBaseStore):
the event will map to None.
"""
- # We only allow edits for `m.room.message` events that have the same sender
- # and event type. We can't assert these things during regular event auth so
- # we have to do the checks post hoc.
+ # We only allow edits for events that have the same sender and event type.
+ # We can't assert these things during regular event auth so we have to do
+ # the checks post hoc.
- # Fetches latest edit that has the same type and sender as the
- # original, and is an `m.room.message`.
+ # Fetches latest edit that has the same type and sender as the original.
if isinstance(self.database_engine, PostgresEngine):
# The `DISTINCT ON` clause will pick the *first* row it encounters,
# so ordering by origin server ts + event ID desc will ensure we get
@@ -493,7 +492,6 @@ class RelationsWorkerStore(SQLBaseStore):
WHERE
%s
AND relation_type = ?
- AND edit.type = 'm.room.message'
ORDER by original.event_id DESC, edit.origin_server_ts DESC, edit.event_id DESC
"""
else:
@@ -512,7 +510,6 @@ class RelationsWorkerStore(SQLBaseStore):
WHERE
%s
AND relation_type = ?
- AND edit.type = 'm.room.message'
ORDER by edit.origin_server_ts, edit.event_id
"""
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index d3393d8e49..97c4dc2603 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -16,7 +16,17 @@
import logging
from enum import Enum
from itertools import chain
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Dict,
+ Iterable,
+ List,
+ Optional,
+ Tuple,
+ Union,
+ cast,
+)
from typing_extensions import Counter
@@ -523,7 +533,7 @@ class StatsStore(StateDeltasStore):
""",
(room_id,),
)
- membership_counts = {membership: cnt for membership, cnt in txn}
+ membership_counts = dict(cast(Iterable[Tuple[str, int]], txn))
txn.execute(
"""
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index ac5fbf6b86..92cbe262a6 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -41,6 +41,7 @@ from typing import (
Any,
Collection,
Dict,
+ Iterable,
List,
Optional,
Set,
@@ -50,7 +51,7 @@ from typing import (
)
import attr
-from frozendict import frozendict
+from immutabledict import immutabledict
from typing_extensions import Literal
from twisted.internet import defer
@@ -557,7 +558,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
if p > min_pos
}
- return RoomStreamToken(None, min_pos, frozendict(positions))
+ return RoomStreamToken(None, min_pos, immutabledict(positions))
async def get_room_events_stream_for_rooms(
self,
@@ -1343,7 +1344,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
GROUP BY type
"""
txn.execute(sql)
- min_positions = {typ: pos for typ, pos in txn} # Map from type -> min position
+ min_positions = dict(
+ cast(Iterable[Tuple[str, int]], txn)
+ ) # Map from type -> min position
# Ensure we do actually have some values here
assert set(min_positions) == {"federation", "events"}
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 97f09b73dd..9fced4b997 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -698,10 +698,17 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
"""Delete the entire user directory"""
def _delete_all_from_user_dir_txn(txn: LoggingTransaction) -> None:
- txn.execute("DELETE FROM user_directory")
- txn.execute("DELETE FROM user_directory_search")
- txn.execute("DELETE FROM users_in_public_rooms")
- txn.execute("DELETE FROM users_who_share_private_rooms")
+ # SQLite doesn't support TRUNCATE.
+ # On Postgres, DELETE FROM does a table scan but TRUNCATE is more efficient.
+ truncate = (
+ "DELETE FROM"
+ if isinstance(self.database_engine, Sqlite3Engine)
+ else "TRUNCATE"
+ )
+ txn.execute(f"{truncate} user_directory")
+ txn.execute(f"{truncate} user_directory_search")
+ txn.execute(f"{truncate} users_in_public_rooms")
+ txn.execute(f"{truncate} users_who_share_private_rooms")
txn.call_after(self.get_user_in_directory.invalidate_all)
await self.db_pool.runInteraction(
|