summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/__init__.py10
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py4
-rw-r--r--synapse/storage/databases/main/media_repository.py2
-rw-r--r--synapse/storage/databases/main/purge_events.py42
4 files changed, 46 insertions, 12 deletions
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py

index 70b49854cf..1d44c3aa2c 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py
@@ -16,7 +16,7 @@ # limitations under the License. import logging -from typing import Any, Dict, List, Optional, Tuple +from typing import List, Optional, Tuple from synapse.api.constants import PresenceState from synapse.config.homeserver import HomeServerConfig @@ -27,7 +27,7 @@ from synapse.storage.util.id_generators import ( MultiWriterIdGenerator, StreamIdGenerator, ) -from synapse.types import get_domain_from_id +from synapse.types import JsonDict, get_domain_from_id from synapse.util.caches.stream_change_cache import StreamChangeCache from .account_data import AccountDataStore @@ -264,7 +264,7 @@ class DataStore( return [UserPresenceState(**row) for row in rows] - async def get_users(self) -> List[Dict[str, Any]]: + async def get_users(self) -> List[JsonDict]: """Function to retrieve a list of users in users table. Returns: @@ -292,7 +292,7 @@ class DataStore( name: Optional[str] = None, guests: bool = True, deactivated: bool = False, - ) -> Tuple[List[Dict[str, Any]], int]: + ) -> Tuple[List[JsonDict], int]: """Function to retrieve a paginated list of users from users list. This will return a json list of users and the total number of users matching the filter criteria. @@ -353,7 +353,7 @@ class DataStore( "get_users_paginate_txn", get_users_paginate_txn ) - async def search_users(self, term: str) -> Optional[List[Dict[str, Any]]]: + async def search_users(self, term: str) -> Optional[List[JsonDict]]: """Function to search users list for one or more users with the matched term. diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index c1626ccf28..cb6b1f8a0c 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -696,7 +696,9 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ) if not has_event_auth: - for auth_id in event.auth_event_ids(): + # Old, dodgy, events may have duplicate auth events, which we + # need to deduplicate as we have a unique constraint. + for auth_id in set(event.auth_event_ids()): auth_events.append( { "room_id": event.room_id, diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index 274f8de595..4f3d192562 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py
@@ -139,7 +139,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): start: int, limit: int, user_id: str, - order_by: MediaSortOrder = MediaSortOrder.CREATED_TS.value, + order_by: str = MediaSortOrder.CREATED_TS.value, direction: str = "f", ) -> Tuple[List[Dict[str, Any]], int]: """Get a paginated list of metadata for a local piece of media diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index ecfc9f20b1..0836e4af49 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py
@@ -28,7 +28,10 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): async def purge_history( self, room_id: str, token: str, delete_local_events: bool ) -> Set[int]: - """Deletes room history before a certain point + """Deletes room history before a certain point. + + Note that only a single purge can occur at once, this is guaranteed via + a higher level (in the PaginationHandler). Args: room_id: @@ -52,7 +55,9 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): delete_local_events, ) - def _purge_history_txn(self, txn, room_id, token, delete_local_events): + def _purge_history_txn( + self, txn, room_id: str, token: RoomStreamToken, delete_local_events: bool + ) -> Set[int]: # Tables that should be pruned: # event_auth # event_backward_extremities @@ -103,7 +108,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): if max_depth < token.topological: # We need to ensure we don't delete all the events from the database # otherwise we wouldn't be able to send any events (due to not - # having any backwards extremeties) + # having any backwards extremities) raise SynapseError( 400, "topological_ordering is greater than forward extremeties" ) @@ -154,7 +159,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): logger.info("[purge] Finding new backward extremities") - # We calculate the new entries for the backward extremeties by finding + # We calculate the new entries for the backward extremities by finding # events to be purged that are pointed to by events we're not going to # purge. txn.execute( @@ -296,7 +301,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): "purge_room", self._purge_room_txn, room_id ) - def _purge_room_txn(self, txn, room_id): + def _purge_room_txn(self, txn, room_id: str) -> List[int]: # First we fetch all the state groups that should be deleted, before # we delete that information. txn.execute( @@ -310,6 +315,31 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): state_groups = [row[0] for row in txn] + # Get all the auth chains that are referenced by events that are to be + # deleted. + txn.execute( + """ + SELECT chain_id, sequence_number FROM events + LEFT JOIN event_auth_chains USING (event_id) + WHERE room_id = ? + """, + (room_id,), + ) + referenced_chain_id_tuples = list(txn) + + logger.info("[purge] removing events from event_auth_chain_links") + txn.executemany( + """ + DELETE FROM event_auth_chain_links WHERE + (origin_chain_id = ? AND origin_sequence_number = ?) OR + (target_chain_id = ? AND target_sequence_number = ?) + """, + ( + (chain_id, seq_num, chain_id, seq_num) + for (chain_id, seq_num) in referenced_chain_id_tuples + ), + ) + # Now we delete tables which lack an index on room_id but have one on event_id for table in ( "event_auth", @@ -319,6 +349,8 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): "event_reference_hashes", "event_relations", "event_to_state_groups", + "event_auth_chains", + "event_auth_chain_to_calculate", "redactions", "rejections", "state_events",