diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 70b49854cf..1d44c3aa2c 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -16,7 +16,7 @@
# limitations under the License.
import logging
-from typing import Any, Dict, List, Optional, Tuple
+from typing import List, Optional, Tuple
from synapse.api.constants import PresenceState
from synapse.config.homeserver import HomeServerConfig
@@ -27,7 +27,7 @@ from synapse.storage.util.id_generators import (
MultiWriterIdGenerator,
StreamIdGenerator,
)
-from synapse.types import get_domain_from_id
+from synapse.types import JsonDict, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
from .account_data import AccountDataStore
@@ -264,7 +264,7 @@ class DataStore(
return [UserPresenceState(**row) for row in rows]
- async def get_users(self) -> List[Dict[str, Any]]:
+ async def get_users(self) -> List[JsonDict]:
"""Function to retrieve a list of users in users table.
Returns:
@@ -292,7 +292,7 @@ class DataStore(
name: Optional[str] = None,
guests: bool = True,
deactivated: bool = False,
- ) -> Tuple[List[Dict[str, Any]], int]:
+ ) -> Tuple[List[JsonDict], int]:
"""Function to retrieve a paginated list of users from
users list. This will return a json list of users and the
total number of users matching the filter criteria.
@@ -353,7 +353,7 @@ class DataStore(
"get_users_paginate_txn", get_users_paginate_txn
)
- async def search_users(self, term: str) -> Optional[List[Dict[str, Any]]]:
+ async def search_users(self, term: str) -> Optional[List[JsonDict]]:
"""Function to search users list for one or more users with
the matched term.
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index c1626ccf28..cb6b1f8a0c 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -696,7 +696,9 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
)
if not has_event_auth:
- for auth_id in event.auth_event_ids():
+ # Old, dodgy, events may have duplicate auth events, which we
+ # need to deduplicate as we have a unique constraint.
+ for auth_id in set(event.auth_event_ids()):
auth_events.append(
{
"room_id": event.room_id,
diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index 274f8de595..4f3d192562 100644
--- a/synapse/storage/databases/main/media_repository.py
+++ b/synapse/storage/databases/main/media_repository.py
@@ -139,7 +139,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
start: int,
limit: int,
user_id: str,
- order_by: MediaSortOrder = MediaSortOrder.CREATED_TS.value,
+ order_by: str = MediaSortOrder.CREATED_TS.value,
direction: str = "f",
) -> Tuple[List[Dict[str, Any]], int]:
"""Get a paginated list of metadata for a local piece of media
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index ecfc9f20b1..0836e4af49 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -28,7 +28,10 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
async def purge_history(
self, room_id: str, token: str, delete_local_events: bool
) -> Set[int]:
- """Deletes room history before a certain point
+ """Deletes room history before a certain point.
+
+ Note that only a single purge can occur at once, this is guaranteed via
+ a higher level (in the PaginationHandler).
Args:
room_id:
@@ -52,7 +55,9 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
delete_local_events,
)
- def _purge_history_txn(self, txn, room_id, token, delete_local_events):
+ def _purge_history_txn(
+ self, txn, room_id: str, token: RoomStreamToken, delete_local_events: bool
+ ) -> Set[int]:
# Tables that should be pruned:
# event_auth
# event_backward_extremities
@@ -103,7 +108,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
if max_depth < token.topological:
# We need to ensure we don't delete all the events from the database
# otherwise we wouldn't be able to send any events (due to not
- # having any backwards extremeties)
+ # having any backwards extremities)
raise SynapseError(
400, "topological_ordering is greater than forward extremeties"
)
@@ -154,7 +159,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
logger.info("[purge] Finding new backward extremities")
- # We calculate the new entries for the backward extremeties by finding
+ # We calculate the new entries for the backward extremities by finding
# events to be purged that are pointed to by events we're not going to
# purge.
txn.execute(
@@ -296,7 +301,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
"purge_room", self._purge_room_txn, room_id
)
- def _purge_room_txn(self, txn, room_id):
+ def _purge_room_txn(self, txn, room_id: str) -> List[int]:
# First we fetch all the state groups that should be deleted, before
# we delete that information.
txn.execute(
@@ -310,6 +315,31 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
state_groups = [row[0] for row in txn]
+ # Get all the auth chains that are referenced by events that are to be
+ # deleted.
+ txn.execute(
+ """
+ SELECT chain_id, sequence_number FROM events
+ LEFT JOIN event_auth_chains USING (event_id)
+ WHERE room_id = ?
+ """,
+ (room_id,),
+ )
+ referenced_chain_id_tuples = list(txn)
+
+ logger.info("[purge] removing events from event_auth_chain_links")
+ txn.executemany(
+ """
+ DELETE FROM event_auth_chain_links WHERE
+ (origin_chain_id = ? AND origin_sequence_number = ?) OR
+ (target_chain_id = ? AND target_sequence_number = ?)
+ """,
+ (
+ (chain_id, seq_num, chain_id, seq_num)
+ for (chain_id, seq_num) in referenced_chain_id_tuples
+ ),
+ )
+
# Now we delete tables which lack an index on room_id but have one on event_id
for table in (
"event_auth",
@@ -319,6 +349,8 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
"event_reference_hashes",
"event_relations",
"event_to_state_groups",
+ "event_auth_chains",
+ "event_auth_chain_to_calculate",
"redactions",
"rejections",
"state_events",
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 6b608ebc9b..85f1ebac98 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -44,6 +44,11 @@ class PusherWorkerStore(SQLBaseStore):
self._remove_deactivated_pushers,
)
+ self.db_pool.updates.register_background_update_handler(
+ "remove_stale_pushers",
+ self._remove_stale_pushers,
+ )
+
def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]:
"""JSON-decode the data in the rows returned from the `pushers` table
@@ -337,6 +342,53 @@ class PusherWorkerStore(SQLBaseStore):
return number_deleted
+ async def _remove_stale_pushers(self, progress: dict, batch_size: int) -> int:
+ """A background update that deletes all pushers for logged out devices.
+
+ Note that we don't proacively tell the pusherpool that we've deleted
+ these (just because its a bit off a faff to do from here), but they will
+ get cleaned up at the next restart
+ """
+
+ last_pusher = progress.get("last_pusher", 0)
+
+ def _delete_pushers(txn) -> int:
+
+ sql = """
+ SELECT p.id, access_token FROM pushers AS p
+ LEFT JOIN access_tokens AS a ON (p.access_token = a.id)
+ WHERE p.id > ?
+ ORDER BY p.id ASC
+ LIMIT ?
+ """
+
+ txn.execute(sql, (last_pusher, batch_size))
+ pushers = [(row[0], row[1]) for row in txn]
+
+ self.db_pool.simple_delete_many_txn(
+ txn,
+ table="pushers",
+ column="id",
+ iterable=(pusher_id for pusher_id, token in pushers if token is None),
+ keyvalues={},
+ )
+
+ if pushers:
+ self.db_pool.updates._background_update_progress_txn(
+ txn, "remove_stale_pushers", {"last_pusher": pushers[-1][0]}
+ )
+
+ return len(pushers)
+
+ number_deleted = await self.db_pool.runInteraction(
+ "_remove_stale_pushers", _delete_pushers
+ )
+
+ if number_deleted < batch_size:
+ await self.db_pool.updates._end_background_update("remove_stale_pushers")
+
+ return number_deleted
+
class PusherStore(PusherWorkerStore):
def get_pushers_stream_token(self) -> int:
diff --git a/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql b/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql
index 2442eea6bc..85196db288 100644
--- a/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql
+++ b/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql
@@ -16,4 +16,5 @@
-- Delete all pushers associated with deleted devices. This is to clear up after
-- a bug where they weren't correctly deleted when using workers.
-DELETE FROM pushers WHERE access_token NOT IN (SELECT id FROM access_tokens);
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (5908, 'remove_stale_pushers', '{}');
|