diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/database.py | 11 | ||||
-rw-r--r-- | synapse/storage/databases/main/account_data.py | 83 | ||||
-rw-r--r-- | synapse/storage/databases/main/appservice.py | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/event_federation.py | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/events.py | 7 | ||||
-rw-r--r-- | synapse/storage/databases/main/purge_events.py | 1 | ||||
-rw-r--r-- | synapse/storage/databases/main/relations.py | 65 | ||||
-rw-r--r-- | synapse/storage/databases/main/signatures.py | 54 | ||||
-rw-r--r-- | synapse/storage/databases/main/stream.py | 22 | ||||
-rw-r--r-- | synapse/storage/databases/main/transactions.py | 48 | ||||
-rw-r--r-- | synapse/storage/engines/_base.py | 19 | ||||
-rw-r--r-- | synapse/storage/engines/postgres.py | 33 | ||||
-rw-r--r-- | synapse/storage/engines/sqlite.py | 7 | ||||
-rw-r--r-- | synapse/storage/prepare_database.py | 9 | ||||
-rw-r--r-- | synapse/storage/schema/__init__.py | 11 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/67/01drop_public_room_list_stream.sql | 18 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/68/01event_columns.sql | 26 | ||||
-rw-r--r-- | synapse/storage/state.py | 14 |
18 files changed, 340 insertions, 92 deletions
diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 57cc1d76e0..99802228c9 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -702,6 +702,7 @@ class DatabasePool: func: Callable[..., R], *args: Any, db_autocommit: bool = False, + isolation_level: Optional[int] = None, **kwargs: Any, ) -> R: """Starts a transaction on the database and runs a given function @@ -724,6 +725,7 @@ class DatabasePool: called multiple times if the transaction is retried, so must correctly handle that case. + isolation_level: Set the server isolation level for this transaction. args: positional args to pass to `func` kwargs: named args to pass to `func` @@ -746,6 +748,7 @@ class DatabasePool: func, *args, db_autocommit=db_autocommit, + isolation_level=isolation_level, **kwargs, ) @@ -763,6 +766,7 @@ class DatabasePool: func: Callable[..., R], *args: Any, db_autocommit: bool = False, + isolation_level: Optional[int] = None, **kwargs: Any, ) -> R: """Wraps the .runWithConnection() method on the underlying db_pool. @@ -775,6 +779,7 @@ class DatabasePool: db_autocommit: Whether to run the function in "autocommit" mode, i.e. outside of a transaction. This is useful for transaction that are only a single query. Currently only affects postgres. + isolation_level: Set the server isolation level for this transaction. kwargs: named args to pass to `func` Returns: @@ -834,6 +839,10 @@ class DatabasePool: try: if db_autocommit: self.engine.attempt_to_set_autocommit(conn, True) + if isolation_level is not None: + self.engine.attempt_to_set_isolation_level( + conn, isolation_level + ) db_conn = LoggingDatabaseConnection( conn, self.engine, "runWithConnection" @@ -842,6 +851,8 @@ class DatabasePool: finally: if db_autocommit: self.engine.attempt_to_set_autocommit(conn, False) + if isolation_level: + self.engine.attempt_to_set_isolation_level(conn, None) return await make_deferred_yieldable( self._db_pool.runWithConnection(inner_func, *args, **kwargs) diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index ef475e18c7..5bfa408f74 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -26,6 +26,7 @@ from synapse.storage.database import ( LoggingTransaction, ) from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore +from synapse.storage.databases.main.push_rule import PushRulesWorkerStore from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import ( AbstractStreamIdGenerator, @@ -44,7 +45,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class AccountDataWorkerStore(CacheInvalidationWorkerStore): +class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore): def __init__( self, database: DatabasePool, @@ -158,9 +159,9 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore): "get_account_data_for_user", get_account_data_for_user_txn ) - @cached(num_args=2, max_entries=5000) + @cached(num_args=2, max_entries=5000, tree=True) async def get_global_account_data_by_type_for_user( - self, data_type: str, user_id: str + self, user_id: str, data_type: str ) -> Optional[JsonDict]: """ Returns: @@ -179,7 +180,7 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore): else: return None - @cached(num_args=2) + @cached(num_args=2, tree=True) async def get_account_data_for_room( self, user_id: str, room_id: str ) -> Dict[str, JsonDict]: @@ -210,7 +211,7 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore): "get_account_data_for_room", get_account_data_for_room_txn ) - @cached(num_args=3, max_entries=5000) + @cached(num_args=3, max_entries=5000, tree=True) async def get_account_data_for_room_and_type( self, user_id: str, room_id: str, account_data_type: str ) -> Optional[JsonDict]: @@ -392,7 +393,7 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore): for row in rows: if not row.room_id: self.get_global_account_data_by_type_for_user.invalidate( - (row.data_type, row.user_id) + (row.user_id, row.data_type) ) self.get_account_data_for_user.invalidate((row.user_id,)) self.get_account_data_for_room.invalidate((row.user_id, row.room_id)) @@ -476,7 +477,7 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore): self._account_data_stream_cache.entity_has_changed(user_id, next_id) self.get_account_data_for_user.invalidate((user_id,)) self.get_global_account_data_by_type_for_user.invalidate( - (account_data_type, user_id) + (user_id, account_data_type) ) return self._account_data_id_gen.get_current_token() @@ -546,6 +547,74 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore): for ignored_user_id in previously_ignored_users ^ currently_ignored_users: self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,)) + async def purge_account_data_for_user(self, user_id: str) -> None: + """ + Removes the account data for a user. + + This is intended to be used upon user deactivation and also removes any + derived information from account data (e.g. push rules and ignored users). + + Args: + user_id: The user ID to remove data for. + """ + + def purge_account_data_for_user_txn(txn: LoggingTransaction) -> None: + # Purge from the primary account_data tables. + self.db_pool.simple_delete_txn( + txn, table="account_data", keyvalues={"user_id": user_id} + ) + + self.db_pool.simple_delete_txn( + txn, table="room_account_data", keyvalues={"user_id": user_id} + ) + + # Purge from ignored_users where this user is the ignorer. + # N.B. We don't purge where this user is the ignoree, because that + # interferes with other users' account data. + # It's also not this user's data to delete! + self.db_pool.simple_delete_txn( + txn, table="ignored_users", keyvalues={"ignorer_user_id": user_id} + ) + + # Remove the push rules + self.db_pool.simple_delete_txn( + txn, table="push_rules", keyvalues={"user_name": user_id} + ) + self.db_pool.simple_delete_txn( + txn, table="push_rules_enable", keyvalues={"user_name": user_id} + ) + self.db_pool.simple_delete_txn( + txn, table="push_rules_stream", keyvalues={"user_id": user_id} + ) + + # Invalidate caches as appropriate + self._invalidate_cache_and_stream( + txn, self.get_account_data_for_room_and_type, (user_id,) + ) + self._invalidate_cache_and_stream( + txn, self.get_account_data_for_user, (user_id,) + ) + self._invalidate_cache_and_stream( + txn, self.get_global_account_data_by_type_for_user, (user_id,) + ) + self._invalidate_cache_and_stream( + txn, self.get_account_data_for_room, (user_id,) + ) + self._invalidate_cache_and_stream( + txn, self.get_push_rules_for_user, (user_id,) + ) + self._invalidate_cache_and_stream( + txn, self.get_push_rules_enabled_for_user, (user_id,) + ) + # This user might be contained in the ignored_by cache for other users, + # so we have to invalidate it all. + self._invalidate_all_cache_and_stream(txn, self.ignored_by) + + await self.db_pool.runInteraction( + "purge_account_data_for_user_txn", + purge_account_data_for_user_txn, + ) + class AccountDataStore(AccountDataWorkerStore): pass diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 92c95a41d7..2bb5288431 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -384,7 +384,7 @@ class ApplicationServiceTransactionWorkerStore( "get_new_events_for_appservice", get_new_events_for_appservice_txn ) - events = await self.get_events_as_list(event_ids) + events = await self.get_events_as_list(event_ids, get_prev_content=True) return upper_bound, events diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index a556f17dac..ca71f073fc 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -65,7 +65,7 @@ class _NoChainCoverIndex(Exception): super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,)) -class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBaseStore): +class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBaseStore): def __init__( self, database: DatabasePool, diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 1ae1ebe108..b7554154ac 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1389,6 +1389,8 @@ class PersistEventsStore: "received_ts", "sender", "contains_url", + "state_key", + "rejection_reason", ), values=( ( @@ -1405,8 +1407,10 @@ class PersistEventsStore: self._clock.time_msec(), event.sender, "url" in event.content and isinstance(event.content["url"], str), + event.get_state_key(), + context.rejected or None, ) - for event, _ in events_and_contexts + for event, context in events_and_contexts ), ) @@ -1456,6 +1460,7 @@ class PersistEventsStore: for event, context in events_and_contexts: if context.rejected: # Insert the event_id into the rejections table + # (events.rejection_reason has already been done) self._store_rejections_txn(txn, event.event_id, context.rejected) to_remove.add(event) diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 91b0576b85..e87a8fb85d 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -390,7 +390,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): "event_search", "events", "group_rooms", - "public_room_list_stream", "receipts_graph", "receipts_linearized", "room_aliases", diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 2cb5d06c13..37468a5183 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -13,17 +13,7 @@ # limitations under the License. import logging -from typing import ( - TYPE_CHECKING, - Any, - Dict, - Iterable, - List, - Optional, - Tuple, - Union, - cast, -) +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union, cast import attr from frozendict import frozendict @@ -43,6 +33,7 @@ from synapse.storage.relations import ( PaginationChunk, RelationPaginationToken, ) +from synapse.types import JsonDict from synapse.util.caches.descriptors import cached if TYPE_CHECKING: @@ -51,6 +42,30 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +@attr.s(slots=True, frozen=True, auto_attribs=True) +class _ThreadAggregation: + latest_event: EventBase + count: int + current_user_participated: bool + + +@attr.s(slots=True, auto_attribs=True) +class BundledAggregations: + """ + The bundled aggregations for an event. + + Some values require additional processing during serialization. + """ + + annotations: Optional[JsonDict] = None + references: Optional[JsonDict] = None + replace: Optional[EventBase] = None + thread: Optional[_ThreadAggregation] = None + + def __bool__(self) -> bool: + return bool(self.annotations or self.references or self.replace or self.thread) + + class RelationsWorkerStore(SQLBaseStore): def __init__( self, @@ -60,7 +75,6 @@ class RelationsWorkerStore(SQLBaseStore): ): super().__init__(database, db_conn, hs) - self._msc1849_enabled = hs.config.experimental.msc1849_enabled self._msc3440_enabled = hs.config.experimental.msc3440_enabled @cached(tree=True) @@ -585,7 +599,7 @@ class RelationsWorkerStore(SQLBaseStore): async def _get_bundled_aggregation_for_event( self, event: EventBase, user_id: str - ) -> Optional[Dict[str, Any]]: + ) -> Optional[BundledAggregations]: """Generate bundled aggregations for an event. Note that this does not use a cache, but depends on cached methods. @@ -616,24 +630,24 @@ class RelationsWorkerStore(SQLBaseStore): # The bundled aggregations to include, a mapping of relation type to a # type-specific value. Some types include the direct return type here # while others need more processing during serialization. - aggregations: Dict[str, Any] = {} + aggregations = BundledAggregations() annotations = await self.get_aggregation_groups_for_event(event_id, room_id) if annotations.chunk: - aggregations[RelationTypes.ANNOTATION] = annotations.to_dict() + aggregations.annotations = annotations.to_dict() references = await self.get_relations_for_event( event_id, room_id, RelationTypes.REFERENCE, direction="f" ) if references.chunk: - aggregations[RelationTypes.REFERENCE] = references.to_dict() + aggregations.references = references.to_dict() edit = None if event.type == EventTypes.Message: edit = await self.get_applicable_edit(event_id, room_id) if edit: - aggregations[RelationTypes.REPLACE] = edit + aggregations.replace = edit # If this event is the start of a thread, include a summary of the replies. if self._msc3440_enabled: @@ -644,11 +658,11 @@ class RelationsWorkerStore(SQLBaseStore): event_id, room_id, user_id ) if latest_thread_event: - aggregations[RelationTypes.THREAD] = { - "latest_event": latest_thread_event, - "count": thread_count, - "current_user_participated": participated, - } + aggregations.thread = _ThreadAggregation( + latest_event=latest_thread_event, + count=thread_count, + current_user_participated=participated, + ) # Store the bundled aggregations in the event metadata for later use. return aggregations @@ -657,7 +671,7 @@ class RelationsWorkerStore(SQLBaseStore): self, events: Iterable[EventBase], user_id: str, - ) -> Dict[str, Dict[str, Any]]: + ) -> Dict[str, BundledAggregations]: """Generate bundled aggregations for events. Args: @@ -668,15 +682,12 @@ class RelationsWorkerStore(SQLBaseStore): A map of event ID to the bundled aggregation for the event. Not all events may have bundled aggregations in the results. """ - # If bundled aggregations are disabled, nothing to do. - if not self._msc1849_enabled: - return {} # TODO Parallelize. results = {} for event in events: event_result = await self._get_bundled_aggregation_for_event(event, user_id) - if event_result is not None: + if event_result: results[event.event_id] = event_result return results diff --git a/synapse/storage/databases/main/signatures.py b/synapse/storage/databases/main/signatures.py index 3201623fe4..0518b8b910 100644 --- a/synapse/storage/databases/main/signatures.py +++ b/synapse/storage/databases/main/signatures.py @@ -12,16 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict, Iterable, List, Tuple +from typing import Collection, Dict, List, Tuple from unpaddedbase64 import encode_base64 -from synapse.storage._base import SQLBaseStore -from synapse.storage.types import Cursor +from synapse.crypto.event_signing import compute_event_reference_hash +from synapse.storage.databases.main.events_worker import ( + EventRedactBehaviour, + EventsWorkerStore, +) from synapse.util.caches.descriptors import cached, cachedList -class SignatureWorkerStore(SQLBaseStore): +class SignatureWorkerStore(EventsWorkerStore): @cached() def get_event_reference_hash(self, event_id): # This is a dummy function to allow get_event_reference_hashes @@ -32,7 +35,7 @@ class SignatureWorkerStore(SQLBaseStore): cached_method_name="get_event_reference_hash", list_name="event_ids", num_args=1 ) async def get_event_reference_hashes( - self, event_ids: Iterable[str] + self, event_ids: Collection[str] ) -> Dict[str, Dict[str, bytes]]: """Get all hashes for given events. @@ -41,18 +44,27 @@ class SignatureWorkerStore(SQLBaseStore): Returns: A mapping of event ID to a mapping of algorithm to hash. + Returns an empty dict for a given event id if that event is unknown. """ + events = await self.get_events( + event_ids, + redact_behaviour=EventRedactBehaviour.AS_IS, + allow_rejected=True, + ) - def f(txn): - return { - event_id: self._get_event_reference_hashes_txn(txn, event_id) - for event_id in event_ids - } + hashes: Dict[str, Dict[str, bytes]] = {} + for event_id in event_ids: + event = events.get(event_id) + if event is None: + hashes[event_id] = {} + else: + ref_alg, ref_hash_bytes = compute_event_reference_hash(event) + hashes[event_id] = {ref_alg: ref_hash_bytes} - return await self.db_pool.runInteraction("get_event_reference_hashes", f) + return hashes async def add_event_hashes( - self, event_ids: Iterable[str] + self, event_ids: Collection[str] ) -> List[Tuple[str, Dict[str, str]]]: """ @@ -70,24 +82,6 @@ class SignatureWorkerStore(SQLBaseStore): return list(encoded_hashes.items()) - def _get_event_reference_hashes_txn( - self, txn: Cursor, event_id: str - ) -> Dict[str, bytes]: - """Get all the hashes for a given PDU. - Args: - txn: - event_id: Id for the Event. - Returns: - A mapping of algorithm -> hash. - """ - query = ( - "SELECT algorithm, hash" - " FROM event_reference_hashes" - " WHERE event_id = ?" - ) - txn.execute(query, (event_id,)) - return {k: v for k, v in txn} - class SignatureStore(SignatureWorkerStore): """Persistence for event signatures and hashes""" diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 319464b1fa..a898f847e7 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -81,6 +81,14 @@ class _EventDictReturn: stream_ordering: int +@attr.s(slots=True, frozen=True, auto_attribs=True) +class _EventsAround: + events_before: List[EventBase] + events_after: List[EventBase] + start: RoomStreamToken + end: RoomStreamToken + + def generate_pagination_where_clause( direction: str, column_names: Tuple[str, str], @@ -846,7 +854,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): before_limit: int, after_limit: int, event_filter: Optional[Filter] = None, - ) -> dict: + ) -> _EventsAround: """Retrieve events and pagination tokens around a given event in a room. """ @@ -869,12 +877,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): list(results["after"]["event_ids"]), get_prev_content=True ) - return { - "events_before": events_before, - "events_after": events_after, - "start": results["before"]["token"], - "end": results["after"]["token"], - } + return _EventsAround( + events_before=events_before, + events_after=events_after, + start=results["before"]["token"], + end=results["after"]["token"], + ) def _get_events_around_txn( self, diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 4b78b4d098..ba79e19f7f 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -561,6 +561,54 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): "get_destinations_paginate_txn", get_destinations_paginate_txn ) + async def get_destination_rooms_paginate( + self, destination: str, start: int, limit: int, direction: str = "f" + ) -> Tuple[List[JsonDict], int]: + """Function to retrieve a paginated list of destination's rooms. + This will return a json list of rooms and the + total number of rooms. + + Args: + destination: the destination to query + start: start number to begin the query from + limit: number of rows to retrieve + direction: sort ascending or descending by room_id + Returns: + A tuple of a dict of rooms and a count of total rooms. + """ + + def get_destination_rooms_paginate_txn( + txn: LoggingTransaction, + ) -> Tuple[List[JsonDict], int]: + + if direction == "b": + order = "DESC" + else: + order = "ASC" + + sql = """ + SELECT COUNT(*) as total_rooms + FROM destination_rooms + WHERE destination = ? + """ + txn.execute(sql, [destination]) + count = cast(Tuple[int], txn.fetchone())[0] + + rooms = self.db_pool.simple_select_list_paginate_txn( + txn=txn, + table="destination_rooms", + orderby="room_id", + start=start, + limit=limit, + retcols=("room_id", "stream_ordering"), + order_direction=order, + ) + return rooms, count + + return await self.db_pool.runInteraction( + "get_destination_rooms_paginate_txn", get_destination_rooms_paginate_txn + ) + async def is_destination_known(self, destination: str) -> bool: """Check if a destination is known to the server.""" result = await self.db_pool.simple_select_one_onecol( diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index 20cd63c330..143cd98ca2 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -12,11 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. import abc -from typing import Generic, TypeVar +from enum import IntEnum +from typing import Generic, Optional, TypeVar from synapse.storage.types import Connection +class IsolationLevel(IntEnum): + READ_COMMITTED: int = 1 + REPEATABLE_READ: int = 2 + SERIALIZABLE: int = 3 + + class IncorrectDatabaseSetup(RuntimeError): pass @@ -109,3 +116,13 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta): commit/rollback the connections. """ ... + + @abc.abstractmethod + def attempt_to_set_isolation_level( + self, conn: Connection, isolation_level: Optional[int] + ): + """Attempt to set the connections isolation level. + + Note: This has no effect on SQLite3, as transactions are SERIALIZABLE by default. + """ + ... diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 30f948a0f7..808342fafb 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -13,8 +13,13 @@ # limitations under the License. import logging +from typing import Mapping, Optional -from synapse.storage.engines._base import BaseDatabaseEngine, IncorrectDatabaseSetup +from synapse.storage.engines._base import ( + BaseDatabaseEngine, + IncorrectDatabaseSetup, + IsolationLevel, +) from synapse.storage.types import Connection logger = logging.getLogger(__name__) @@ -34,6 +39,15 @@ class PostgresEngine(BaseDatabaseEngine): self.synchronous_commit = database_config.get("synchronous_commit", True) self._version = None # unknown as yet + self.isolation_level_map: Mapping[int, int] = { + IsolationLevel.READ_COMMITTED: self.module.extensions.ISOLATION_LEVEL_READ_COMMITTED, + IsolationLevel.REPEATABLE_READ: self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ, + IsolationLevel.SERIALIZABLE: self.module.extensions.ISOLATION_LEVEL_SERIALIZABLE, + } + self.default_isolation_level = ( + self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ + ) + @property def single_threaded(self) -> bool: return False @@ -46,8 +60,8 @@ class PostgresEngine(BaseDatabaseEngine): self._version = db_conn.server_version # Are we on a supported PostgreSQL version? - if not allow_outdated_version and self._version < 90600: - raise RuntimeError("Synapse requires PostgreSQL 9.6 or above.") + if not allow_outdated_version and self._version < 100000: + raise RuntimeError("Synapse requires PostgreSQL 10 or above.") with db_conn.cursor() as txn: txn.execute("SHOW SERVER_ENCODING") @@ -104,9 +118,7 @@ class PostgresEngine(BaseDatabaseEngine): return sql.replace("?", "%s") def on_new_connection(self, db_conn): - db_conn.set_isolation_level( - self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ - ) + db_conn.set_isolation_level(self.default_isolation_level) # Set the bytea output to escape, vs the default of hex cursor = db_conn.cursor() @@ -175,3 +187,12 @@ class PostgresEngine(BaseDatabaseEngine): def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool): return conn.set_session(autocommit=autocommit) # type: ignore + + def attempt_to_set_isolation_level( + self, conn: Connection, isolation_level: Optional[int] + ): + if isolation_level is None: + isolation_level = self.default_isolation_level + else: + isolation_level = self.isolation_level_map[isolation_level] + return conn.set_isolation_level(isolation_level) # type: ignore diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 70d17d4f2c..6c19e55999 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -15,6 +15,7 @@ import platform import struct import threading import typing +from typing import Optional from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.types import Connection @@ -122,6 +123,12 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]): # set the connection to autocommit mode. pass + def attempt_to_set_isolation_level( + self, conn: Connection, isolation_level: Optional[int] + ): + # All transactions are SERIALIZABLE by default in sqllite + pass + # Following functions taken from: https://github.com/coleifer/peewee diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 1823e18720..e3153d1a4a 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -499,9 +499,12 @@ def _upgrade_existing_database( module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # type: ignore - logger.info("Running script %s", relative_path) - module.run_create(cur, database_engine) # type: ignore - if not is_empty: + if hasattr(module, "run_create"): + logger.info("Running %s:run_create", relative_path) + module.run_create(cur, database_engine) # type: ignore + + if not is_empty and hasattr(module, "run_upgrade"): + logger.info("Running %s:run_upgrade", relative_path) module.run_upgrade(cur, database_engine, config=config) # type: ignore elif ext == ".pyc" or file_name == "__pycache__": # Sometimes .pyc files turn up anyway even though we've diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 2a3d47185a..7b21c1b96d 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -SCHEMA_VERSION = 67 # remember to update the list below when updating +SCHEMA_VERSION = 68 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -53,11 +53,18 @@ Changes in SCHEMA_VERSION = 66: Changes in SCHEMA_VERSION = 67: - state_events.prev_state is no longer written to. + +Changes in SCHEMA_VERSION = 68: + - event_reference_hashes is no longer read. + - `events` has `state_key` and `rejection_reason` columns, which are populated for + new events. """ SCHEMA_COMPAT_VERSION = ( - 61 # 61: Remove unused tables `user_stats_historical` and `room_stats_historical` + # we now have `state_key` columns in both `events` and `state_events`, so + # now incompatible with synapses wth SCHEMA_VERSION < 66. + 66 ) """Limit on how far the synapse codebase can be rolled back without breaking db compat diff --git a/synapse/storage/schema/main/delta/67/01drop_public_room_list_stream.sql b/synapse/storage/schema/main/delta/67/01drop_public_room_list_stream.sql new file mode 100644 index 0000000000..1eb8de9907 --- /dev/null +++ b/synapse/storage/schema/main/delta/67/01drop_public_room_list_stream.sql @@ -0,0 +1,18 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- this table is unused as of Synapse 1.41 +DROP TABLE public_room_list_stream; + diff --git a/synapse/storage/schema/main/delta/68/01event_columns.sql b/synapse/storage/schema/main/delta/68/01event_columns.sql new file mode 100644 index 0000000000..7c072f972e --- /dev/null +++ b/synapse/storage/schema/main/delta/68/01event_columns.sql @@ -0,0 +1,26 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Add new colums to the `events` table which will (one day) make the `state_events` +-- and `rejections` tables redundant. + +ALTER TABLE events + -- if this event is a state event, its state key + ADD COLUMN state_key TEXT DEFAULT NULL; + + +ALTER TABLE events + -- if this event was rejected, the reason it was rejected. + ADD COLUMN rejection_reason TEXT DEFAULT NULL; diff --git a/synapse/storage/state.py b/synapse/storage/state.py index df8b2f1088..913448f0f9 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -74,21 +74,21 @@ class StateFilter: @staticmethod def all() -> "StateFilter": - """Creates a filter that fetches everything. + """Returns a filter that fetches everything. Returns: - The new state filter. + The state filter. """ - return StateFilter(types=frozendict(), include_others=True) + return _ALL_STATE_FILTER @staticmethod def none() -> "StateFilter": - """Creates a filter that fetches nothing. + """Returns a filter that fetches nothing. Returns: The new state filter. """ - return StateFilter(types=frozendict(), include_others=False) + return _NONE_STATE_FILTER @staticmethod def from_types(types: Iterable[Tuple[str, Optional[str]]]) -> "StateFilter": @@ -527,6 +527,10 @@ class StateFilter: ) +_ALL_STATE_FILTER = StateFilter(types=frozendict(), include_others=True) +_NONE_STATE_FILTER = StateFilter(types=frozendict(), include_others=False) + + class StateGroupStorage: """High level interface to fetching state for event.""" |