diff options
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r-- | synapse/storage/databases/main/__init__.py | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/event_federation.py | 211 | ||||
-rw-r--r-- | synapse/storage/databases/main/events.py | 125 | ||||
-rw-r--r-- | synapse/storage/databases/main/room_batch.py | 47 |
4 files changed, 1 insertions, 384 deletions
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 0032a92f49..3a10c265c9 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -61,7 +61,6 @@ from .registration import RegistrationStore from .rejections import RejectionsStore from .relations import RelationsStore from .room import RoomStore -from .room_batch import RoomBatchStore from .roommember import RoomMemberStore from .search import SearchStore from .session import SessionStore @@ -87,7 +86,6 @@ class DataStore( DeviceStore, RoomMemberStore, RoomStore, - RoomBatchStore, RegistrationStore, ProfileStore, PresenceStore, diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 2681917d0b..8b6e3c1dc7 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -31,7 +31,7 @@ from typing import ( import attr from prometheus_client import Counter, Gauge -from synapse.api.constants import MAX_DEPTH, EventTypes +from synapse.api.constants import MAX_DEPTH from synapse.api.errors import StoreError from synapse.api.room_versions import EventFormatVersions, RoomVersion from synapse.events import EventBase, make_event_from_dict @@ -891,124 +891,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas room_id, ) - @trace - async def get_insertion_event_backward_extremities_in_room( - self, - room_id: str, - current_depth: int, - limit: int, - ) -> List[Tuple[str, int]]: - """ - Get the insertion events we know about that we haven't backfilled yet - along with the approximate depth. Only returns insertion events that are - at a depth lower than or equal to the `current_depth`. Sorted by depth, - highest to lowest (descending) so the closest events to the - `current_depth` are first in the list. - - We ignore insertion events that are newer than the user's current scroll - position (ie, those with depth greater than `current_depth`) as: - 1. we don't really care about getting events that have happened - after our current position; and - 2. by the nature of paginating and scrolling back, we have likely - previously tried and failed to backfill from that insertion event, so - to avoid getting "stuck" requesting the same backfill repeatedly - we drop those insertion event. - - Args: - room_id: Room where we want to find the oldest events - current_depth: The depth at the user's current scrollback position - limit: The max number of insertion event extremities to return - - Returns: - List of (event_id, depth) tuples. Sorted by depth, highest to lowest - (descending) so the closest events to the `current_depth` are first - in the list. - """ - - def get_insertion_event_backward_extremities_in_room_txn( - txn: LoggingTransaction, room_id: str - ) -> List[Tuple[str, int]]: - if isinstance(self.database_engine, PostgresEngine): - least_function = "LEAST" - elif isinstance(self.database_engine, Sqlite3Engine): - least_function = "MIN" - else: - raise RuntimeError("Unknown database engine") - - sql = f""" - SELECT - insertion_event_extremity.event_id, event.depth - /* We only want insertion events that are also marked as backwards extremities */ - FROM insertion_event_extremities AS insertion_event_extremity - /* Get the depth of the insertion event from the events table */ - INNER JOIN events AS event USING (event_id) - /** - * We use this info to make sure we don't retry to use a backfill point - * if we've already attempted to backfill from it recently. - */ - LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info - ON - failed_backfill_attempt_info.room_id = insertion_event_extremity.room_id - AND failed_backfill_attempt_info.event_id = insertion_event_extremity.event_id - WHERE - insertion_event_extremity.room_id = ? - /** - * We only want extremities that are older than or at - * the same position of the given `current_depth` (where older - * means less than the given depth) because we're looking backwards - * from the `current_depth` when backfilling. - * - * current_depth (ignore events that come after this, ignore 2-4) - * | - * ▼ - * <oldest-in-time> [0]<--[1]<--[2]<--[3]<--[4] <newest-in-time> - */ - AND event.depth <= ? /* current_depth */ - /** - * Exponential back-off (up to the upper bound) so we don't retry the - * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc - * - * We use `1 << n` as a power of 2 equivalent for compatibility - * with older SQLites. The left shift equivalent only works with - * powers of 2 because left shift is a binary operation (base-2). - * Otherwise, we would use `power(2, n)` or the power operator, `2^n`. - */ - AND ( - failed_backfill_attempt_info.event_id IS NULL - OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + ( - (1 << {least_function}(failed_backfill_attempt_info.num_attempts, ? /* max doubling steps */)) - * ? /* step */ - ) - ) - /** - * Sort from highest (closest to the `current_depth`) to the lowest depth - * because the closest are most relevant to backfill from first. - * Then tie-break on alphabetical order of the event_ids so we get a - * consistent ordering which is nice when asserting things in tests. - */ - ORDER BY event.depth DESC, insertion_event_extremity.event_id DESC - LIMIT ? - """ - - txn.execute( - sql, - ( - room_id, - current_depth, - self._clock.time_msec(), - BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, - BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS, - limit, - ), - ) - return cast(List[Tuple[str, int]], txn.fetchall()) - - return await self.db_pool.runInteraction( - "get_insertion_event_backward_extremities_in_room", - get_insertion_event_backward_extremities_in_room_txn, - room_id, - ) - async def get_max_depth_of( self, event_ids: Collection[str] ) -> Tuple[Optional[str], int]: @@ -1280,50 +1162,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas return event_ids - def _get_connected_batch_event_backfill_results_txn( - self, txn: LoggingTransaction, insertion_event_id: str, limit: int - ) -> List[BackfillQueueNavigationItem]: - """ - Find any batch connections of a given insertion event. - A batch event points at a insertion event via: - batch_event.content[MSC2716_BATCH_ID] -> insertion_event.content[MSC2716_NEXT_BATCH_ID] - - Args: - txn: The database transaction to use - insertion_event_id: The event ID to navigate from. We will find - batch events that point back at this insertion event. - limit: Max number of event ID's to query for and return - - Returns: - List of batch events that the backfill queue can process - """ - batch_connection_query = """ - SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i - /* Find the batch that connects to the given insertion event */ - INNER JOIN batch_events AS c - ON i.next_batch_id = c.batch_id - /* Get the depth of the batch start event from the events table */ - INNER JOIN events AS e ON c.event_id = e.event_id - /* Find an insertion event which matches the given event_id */ - WHERE i.event_id = ? - LIMIT ? - """ - - # Find any batch connections for the given insertion event - txn.execute( - batch_connection_query, - (insertion_event_id, limit), - ) - return [ - BackfillQueueNavigationItem( - depth=row[0], - stream_ordering=row[1], - event_id=row[2], - type=row[3], - ) - for row in txn - ] - def _get_connected_prev_event_backfill_results_txn( self, txn: LoggingTransaction, event_id: str, limit: int ) -> List[BackfillQueueNavigationItem]: @@ -1472,40 +1310,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas event_id_results.add(event_id) - # Try and find any potential historical batches of message history. - if self.hs.config.experimental.msc2716_enabled: - # We need to go and try to find any batch events connected - # to a given insertion event (by batch_id). If we find any, we'll - # add them to the queue and navigate up the DAG like normal in the - # next iteration of the loop. - if event_type == EventTypes.MSC2716_INSERTION: - # Find any batch connections for the given insertion event - connected_batch_event_backfill_results = ( - self._get_connected_batch_event_backfill_results_txn( - txn, event_id, limit - len(event_id_results) - ) - ) - logger.debug( - "_get_backfill_events(room_id=%s): connected_batch_event_backfill_results=%s", - room_id, - connected_batch_event_backfill_results, - ) - for ( - connected_batch_event_backfill_item - ) in connected_batch_event_backfill_results: - if ( - connected_batch_event_backfill_item.event_id - not in event_id_results - ): - queue.put( - ( - -connected_batch_event_backfill_item.depth, - -connected_batch_event_backfill_item.stream_ordering, - connected_batch_event_backfill_item.event_id, - connected_batch_event_backfill_item.type, - ) - ) - # Now we just look up the DAG by prev_events as normal connected_prev_event_backfill_results = ( self._get_connected_prev_event_backfill_results_txn( @@ -1748,19 +1552,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas _delete_old_forward_extrem_cache_txn, ) - @trace - async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None: - await self.db_pool.simple_upsert( - table="insertion_event_extremities", - keyvalues={"event_id": event_id}, - values={ - "event_id": event_id, - "room_id": room_id, - }, - insertion_values={}, - desc="insert_insertion_extremity", - ) - async def insert_received_event_to_staging( self, origin: str, event: EventBase ) -> None: diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 44af3357af..5c9db7554e 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1664,9 +1664,6 @@ class PersistEventsStore: self._handle_event_relations(txn, event) - self._handle_insertion_event(txn, event) - self._handle_batch_event(txn, event) - # Store the labels for this event. labels = event.content.get(EventContentFields.LABELS) if labels: @@ -1927,128 +1924,6 @@ class PersistEventsStore: ), ) - def _handle_insertion_event( - self, txn: LoggingTransaction, event: EventBase - ) -> None: - """Handles keeping track of insertion events and edges/connections. - Part of MSC2716. - - Args: - txn: The database transaction object - event: The event to process - """ - - if event.type != EventTypes.MSC2716_INSERTION: - # Not a insertion event - return - - # Skip processing an insertion event if the room version doesn't - # support it or the event is not from the room creator. - room_version = self.store.get_room_version_txn(txn, event.room_id) - room_creator = self.db_pool.simple_select_one_onecol_txn( - txn, - table="rooms", - keyvalues={"room_id": event.room_id}, - retcol="creator", - allow_none=True, - ) - if not room_version.msc2716_historical and ( - not self.hs.config.experimental.msc2716_enabled - or event.sender != room_creator - ): - return - - next_batch_id = event.content.get(EventContentFields.MSC2716_NEXT_BATCH_ID) - if next_batch_id is None: - # Invalid insertion event without next batch ID - return - - logger.debug( - "_handle_insertion_event (next_batch_id=%s) %s", next_batch_id, event - ) - - # Keep track of the insertion event and the batch ID - self.db_pool.simple_insert_txn( - txn, - table="insertion_events", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "next_batch_id": next_batch_id, - }, - ) - - # Insert an edge for every prev_event connection - for prev_event_id in event.prev_event_ids(): - self.db_pool.simple_insert_txn( - txn, - table="insertion_event_edges", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "insertion_prev_event_id": prev_event_id, - }, - ) - - def _handle_batch_event(self, txn: LoggingTransaction, event: EventBase) -> None: - """Handles inserting the batch edges/connections between the batch event - and an insertion event. Part of MSC2716. - - Args: - txn: The database transaction object - event: The event to process - """ - - if event.type != EventTypes.MSC2716_BATCH: - # Not a batch event - return - - # Skip processing a batch event if the room version doesn't - # support it or the event is not from the room creator. - room_version = self.store.get_room_version_txn(txn, event.room_id) - room_creator = self.db_pool.simple_select_one_onecol_txn( - txn, - table="rooms", - keyvalues={"room_id": event.room_id}, - retcol="creator", - allow_none=True, - ) - if not room_version.msc2716_historical and ( - not self.hs.config.experimental.msc2716_enabled - or event.sender != room_creator - ): - return - - batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID) - if batch_id is None: - # Invalid batch event without a batch ID - return - - logger.debug("_handle_batch_event batch_id=%s %s", batch_id, event) - - # Keep track of the insertion event and the batch ID - self.db_pool.simple_insert_txn( - txn, - table="batch_events", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "batch_id": batch_id, - }, - ) - - # When we receive an event with a `batch_id` referencing the - # `next_batch_id` of the insertion event, we can remove it from the - # `insertion_event_extremities` table. - sql = """ - DELETE FROM insertion_event_extremities WHERE event_id IN ( - SELECT event_id FROM insertion_events - WHERE next_batch_id = ? - ) - """ - - txn.execute(sql, (batch_id,)) - def _handle_redact_relations( self, txn: LoggingTransaction, room_id: str, redacted_event_id: str ) -> None: diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py deleted file mode 100644 index 131f357d04..0000000000 --- a/synapse/storage/databases/main/room_batch.py +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright 2021 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Optional - -from synapse.storage._base import SQLBaseStore - - -class RoomBatchStore(SQLBaseStore): - async def get_insertion_event_id_by_batch_id( - self, room_id: str, batch_id: str - ) -> Optional[str]: - """Retrieve a insertion event ID. - - Args: - batch_id: The batch ID of the insertion event to retrieve. - - Returns: - The event_id of an insertion event, or None if there is no known - insertion event for the given insertion event. - """ - return await self.db_pool.simple_select_one_onecol( - table="insertion_events", - keyvalues={"room_id": room_id, "next_batch_id": batch_id}, - retcol="event_id", - allow_none=True, - ) - - async def store_state_group_id_for_event_id( - self, event_id: str, state_group_id: int - ) -> None: - await self.db_pool.simple_upsert( - table="event_to_state_groups", - keyvalues={"event_id": event_id}, - values={"state_group": state_group_id, "event_id": event_id}, - ) |