summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/__init__.py2
-rw-r--r--synapse/storage/databases/main/event_federation.py211
-rw-r--r--synapse/storage/databases/main/events.py125
-rw-r--r--synapse/storage/databases/main/room_batch.py47
4 files changed, 1 insertions, 384 deletions
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 0032a92f49..3a10c265c9 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -61,7 +61,6 @@ from .registration import RegistrationStore
 from .rejections import RejectionsStore
 from .relations import RelationsStore
 from .room import RoomStore
-from .room_batch import RoomBatchStore
 from .roommember import RoomMemberStore
 from .search import SearchStore
 from .session import SessionStore
@@ -87,7 +86,6 @@ class DataStore(
     DeviceStore,
     RoomMemberStore,
     RoomStore,
-    RoomBatchStore,
     RegistrationStore,
     ProfileStore,
     PresenceStore,
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 2681917d0b..8b6e3c1dc7 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -31,7 +31,7 @@ from typing import (
 import attr
 from prometheus_client import Counter, Gauge
 
-from synapse.api.constants import MAX_DEPTH, EventTypes
+from synapse.api.constants import MAX_DEPTH
 from synapse.api.errors import StoreError
 from synapse.api.room_versions import EventFormatVersions, RoomVersion
 from synapse.events import EventBase, make_event_from_dict
@@ -891,124 +891,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             room_id,
         )
 
-    @trace
-    async def get_insertion_event_backward_extremities_in_room(
-        self,
-        room_id: str,
-        current_depth: int,
-        limit: int,
-    ) -> List[Tuple[str, int]]:
-        """
-        Get the insertion events we know about that we haven't backfilled yet
-        along with the approximate depth. Only returns insertion events that are
-        at a depth lower than or equal to the `current_depth`. Sorted by depth,
-        highest to lowest (descending) so the closest events to the
-        `current_depth` are first in the list.
-
-        We ignore insertion events that are newer than the user's current scroll
-        position (ie, those with depth greater than `current_depth`) as:
-            1. we don't really care about getting events that have happened
-               after our current position; and
-            2. by the nature of paginating and scrolling back, we have likely
-               previously tried and failed to backfill from that insertion event, so
-               to avoid getting "stuck" requesting the same backfill repeatedly
-               we drop those insertion event.
-
-        Args:
-            room_id: Room where we want to find the oldest events
-            current_depth: The depth at the user's current scrollback position
-            limit: The max number of insertion event extremities to return
-
-        Returns:
-            List of (event_id, depth) tuples. Sorted by depth, highest to lowest
-            (descending) so the closest events to the `current_depth` are first
-            in the list.
-        """
-
-        def get_insertion_event_backward_extremities_in_room_txn(
-            txn: LoggingTransaction, room_id: str
-        ) -> List[Tuple[str, int]]:
-            if isinstance(self.database_engine, PostgresEngine):
-                least_function = "LEAST"
-            elif isinstance(self.database_engine, Sqlite3Engine):
-                least_function = "MIN"
-            else:
-                raise RuntimeError("Unknown database engine")
-
-            sql = f"""
-                SELECT
-                    insertion_event_extremity.event_id, event.depth
-                /* We only want insertion events that are also marked as backwards extremities */
-                FROM insertion_event_extremities AS insertion_event_extremity
-                /* Get the depth of the insertion event from the events table */
-                INNER JOIN events AS event USING (event_id)
-                /**
-                 * We use this info to make sure we don't retry to use a backfill point
-                 * if we've already attempted to backfill from it recently.
-                 */
-                LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info
-                ON
-                    failed_backfill_attempt_info.room_id = insertion_event_extremity.room_id
-                    AND failed_backfill_attempt_info.event_id = insertion_event_extremity.event_id
-                WHERE
-                    insertion_event_extremity.room_id = ?
-                    /**
-                     * We only want extremities that are older than or at
-                     * the same position of the given `current_depth` (where older
-                     * means less than the given depth) because we're looking backwards
-                     * from the `current_depth` when backfilling.
-                     *
-                     *                         current_depth (ignore events that come after this, ignore 2-4)
-                     *                         |
-                     *                         ▼
-                     * <oldest-in-time> [0]<--[1]<--[2]<--[3]<--[4] <newest-in-time>
-                     */
-                    AND event.depth <= ? /* current_depth */
-                    /**
-                     * Exponential back-off (up to the upper bound) so we don't retry the
-                     * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc
-                     *
-                     * We use `1 << n` as a power of 2 equivalent for compatibility
-                     * with older SQLites. The left shift equivalent only works with
-                     * powers of 2 because left shift is a binary operation (base-2).
-                     * Otherwise, we would use `power(2, n)` or the power operator, `2^n`.
-                     */
-                    AND (
-                        failed_backfill_attempt_info.event_id IS NULL
-                        OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + (
-                            (1 << {least_function}(failed_backfill_attempt_info.num_attempts, ? /* max doubling steps */))
-                            * ? /* step */
-                        )
-                    )
-                /**
-                 * Sort from highest (closest to the `current_depth`) to the lowest depth
-                 * because the closest are most relevant to backfill from first.
-                 * Then tie-break on alphabetical order of the event_ids so we get a
-                 * consistent ordering which is nice when asserting things in tests.
-                 */
-                ORDER BY event.depth DESC, insertion_event_extremity.event_id DESC
-                LIMIT ?
-            """
-
-            txn.execute(
-                sql,
-                (
-                    room_id,
-                    current_depth,
-                    self._clock.time_msec(),
-                    BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
-                    BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS,
-                    limit,
-                ),
-            )
-            return cast(List[Tuple[str, int]], txn.fetchall())
-
-        return await self.db_pool.runInteraction(
-            "get_insertion_event_backward_extremities_in_room",
-            get_insertion_event_backward_extremities_in_room_txn,
-            room_id,
-        )
-
     async def get_max_depth_of(
         self, event_ids: Collection[str]
     ) -> Tuple[Optional[str], int]:
@@ -1280,50 +1162,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
 
         return event_ids
 
-    def _get_connected_batch_event_backfill_results_txn(
-        self, txn: LoggingTransaction, insertion_event_id: str, limit: int
-    ) -> List[BackfillQueueNavigationItem]:
-        """
-        Find any batch connections of a given insertion event.
-        A batch event points at a insertion event via:
-        batch_event.content[MSC2716_BATCH_ID] -> insertion_event.content[MSC2716_NEXT_BATCH_ID]
-
-        Args:
-            txn: The database transaction to use
-            insertion_event_id: The event ID to navigate from. We will find
-                batch events that point back at this insertion event.
-            limit: Max number of event ID's to query for and return
-
-        Returns:
-            List of batch events that the backfill queue can process
-        """
-        batch_connection_query = """
-            SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i
-            /* Find the batch that connects to the given insertion event */
-            INNER JOIN batch_events AS c
-            ON i.next_batch_id = c.batch_id
-            /* Get the depth of the batch start event from the events table */
-            INNER JOIN events AS e ON c.event_id = e.event_id
-            /* Find an insertion event which matches the given event_id */
-            WHERE i.event_id = ?
-            LIMIT ?
-        """
-
-        # Find any batch connections for the given insertion event
-        txn.execute(
-            batch_connection_query,
-            (insertion_event_id, limit),
-        )
-        return [
-            BackfillQueueNavigationItem(
-                depth=row[0],
-                stream_ordering=row[1],
-                event_id=row[2],
-                type=row[3],
-            )
-            for row in txn
-        ]
-
     def _get_connected_prev_event_backfill_results_txn(
         self, txn: LoggingTransaction, event_id: str, limit: int
     ) -> List[BackfillQueueNavigationItem]:
@@ -1472,40 +1310,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
 
             event_id_results.add(event_id)
 
-            # Try and find any potential historical batches of message history.
-            if self.hs.config.experimental.msc2716_enabled:
-                # We need to go and try to find any batch events connected
-                # to a given insertion event (by batch_id). If we find any, we'll
-                # add them to the queue and navigate up the DAG like normal in the
-                # next iteration of the loop.
-                if event_type == EventTypes.MSC2716_INSERTION:
-                    # Find any batch connections for the given insertion event
-                    connected_batch_event_backfill_results = (
-                        self._get_connected_batch_event_backfill_results_txn(
-                            txn, event_id, limit - len(event_id_results)
-                        )
-                    )
-                    logger.debug(
-                        "_get_backfill_events(room_id=%s): connected_batch_event_backfill_results=%s",
-                        room_id,
-                        connected_batch_event_backfill_results,
-                    )
-                    for (
-                        connected_batch_event_backfill_item
-                    ) in connected_batch_event_backfill_results:
-                        if (
-                            connected_batch_event_backfill_item.event_id
-                            not in event_id_results
-                        ):
-                            queue.put(
-                                (
-                                    -connected_batch_event_backfill_item.depth,
-                                    -connected_batch_event_backfill_item.stream_ordering,
-                                    connected_batch_event_backfill_item.event_id,
-                                    connected_batch_event_backfill_item.type,
-                                )
-                            )
-
             # Now we just look up the DAG by prev_events as normal
             connected_prev_event_backfill_results = (
                 self._get_connected_prev_event_backfill_results_txn(
@@ -1748,19 +1552,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             _delete_old_forward_extrem_cache_txn,
         )
 
-    @trace
-    async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
-        await self.db_pool.simple_upsert(
-            table="insertion_event_extremities",
-            keyvalues={"event_id": event_id},
-            values={
-                "event_id": event_id,
-                "room_id": room_id,
-            },
-            insertion_values={},
-            desc="insert_insertion_extremity",
-        )
-
     async def insert_received_event_to_staging(
         self, origin: str, event: EventBase
     ) -> None:
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 44af3357af..5c9db7554e 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1664,9 +1664,6 @@ class PersistEventsStore:
 
             self._handle_event_relations(txn, event)
 
-            self._handle_insertion_event(txn, event)
-            self._handle_batch_event(txn, event)
-
             # Store the labels for this event.
             labels = event.content.get(EventContentFields.LABELS)
             if labels:
@@ -1927,128 +1924,6 @@ class PersistEventsStore:
                 ),
             )
 
-    def _handle_insertion_event(
-        self, txn: LoggingTransaction, event: EventBase
-    ) -> None:
-        """Handles keeping track of insertion events and edges/connections.
-        Part of MSC2716.
-
-        Args:
-            txn: The database transaction object
-            event: The event to process
-        """
-
-        if event.type != EventTypes.MSC2716_INSERTION:
-            # Not a insertion event
-            return
-
-        # Skip processing an insertion event if the room version doesn't
-        # support it or the event is not from the room creator.
-        room_version = self.store.get_room_version_txn(txn, event.room_id)
-        room_creator = self.db_pool.simple_select_one_onecol_txn(
-            txn,
-            table="rooms",
-            keyvalues={"room_id": event.room_id},
-            retcol="creator",
-            allow_none=True,
-        )
-        if not room_version.msc2716_historical and (
-            not self.hs.config.experimental.msc2716_enabled
-            or event.sender != room_creator
-        ):
-            return
-
-        next_batch_id = event.content.get(EventContentFields.MSC2716_NEXT_BATCH_ID)
-        if next_batch_id is None:
-            # Invalid insertion event without next batch ID
-            return
-
-        logger.debug(
-            "_handle_insertion_event (next_batch_id=%s) %s", next_batch_id, event
-        )
-
-        # Keep track of the insertion event and the batch ID
-        self.db_pool.simple_insert_txn(
-            txn,
-            table="insertion_events",
-            values={
-                "event_id": event.event_id,
-                "room_id": event.room_id,
-                "next_batch_id": next_batch_id,
-            },
-        )
-
-        # Insert an edge for every prev_event connection
-        for prev_event_id in event.prev_event_ids():
-            self.db_pool.simple_insert_txn(
-                txn,
-                table="insertion_event_edges",
-                values={
-                    "event_id": event.event_id,
-                    "room_id": event.room_id,
-                    "insertion_prev_event_id": prev_event_id,
-                },
-            )
-
-    def _handle_batch_event(self, txn: LoggingTransaction, event: EventBase) -> None:
-        """Handles inserting the batch edges/connections between the batch event
-        and an insertion event. Part of MSC2716.
-
-        Args:
-            txn: The database transaction object
-            event: The event to process
-        """
-
-        if event.type != EventTypes.MSC2716_BATCH:
-            # Not a batch event
-            return
-
-        # Skip processing a batch event if the room version doesn't
-        # support it or the event is not from the room creator.
-        room_version = self.store.get_room_version_txn(txn, event.room_id)
-        room_creator = self.db_pool.simple_select_one_onecol_txn(
-            txn,
-            table="rooms",
-            keyvalues={"room_id": event.room_id},
-            retcol="creator",
-            allow_none=True,
-        )
-        if not room_version.msc2716_historical and (
-            not self.hs.config.experimental.msc2716_enabled
-            or event.sender != room_creator
-        ):
-            return
-
-        batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID)
-        if batch_id is None:
-            # Invalid batch event without a batch ID
-            return
-
-        logger.debug("_handle_batch_event batch_id=%s %s", batch_id, event)
-
-        # Keep track of the insertion event and the batch ID
-        self.db_pool.simple_insert_txn(
-            txn,
-            table="batch_events",
-            values={
-                "event_id": event.event_id,
-                "room_id": event.room_id,
-                "batch_id": batch_id,
-            },
-        )
-
-        # When we receive an event with a `batch_id` referencing the
-        # `next_batch_id` of the insertion event, we can remove it from the
-        # `insertion_event_extremities` table.
-        sql = """
-            DELETE FROM insertion_event_extremities WHERE event_id IN (
-                SELECT event_id FROM insertion_events
-                WHERE next_batch_id = ?
-            )
-        """
-
-        txn.execute(sql, (batch_id,))
-
     def _handle_redact_relations(
         self, txn: LoggingTransaction, room_id: str, redacted_event_id: str
     ) -> None:
diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py
deleted file mode 100644
index 131f357d04..0000000000
--- a/synapse/storage/databases/main/room_batch.py
+++ /dev/null
@@ -1,47 +0,0 @@
-# Copyright 2021 The Matrix.org Foundation C.I.C.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from typing import Optional
-
-from synapse.storage._base import SQLBaseStore
-
-
-class RoomBatchStore(SQLBaseStore):
-    async def get_insertion_event_id_by_batch_id(
-        self, room_id: str, batch_id: str
-    ) -> Optional[str]:
-        """Retrieve a insertion event ID.
-
-        Args:
-            batch_id: The batch ID of the insertion event to retrieve.
-
-        Returns:
-            The event_id of an insertion event, or None if there is no known
-            insertion event for the given insertion event.
-        """
-        return await self.db_pool.simple_select_one_onecol(
-            table="insertion_events",
-            keyvalues={"room_id": room_id, "next_batch_id": batch_id},
-            retcol="event_id",
-            allow_none=True,
-        )
-
-    async def store_state_group_id_for_event_id(
-        self, event_id: str, state_group_id: int
-    ) -> None:
-        await self.db_pool.simple_upsert(
-            table="event_to_state_groups",
-            keyvalues={"event_id": event_id},
-            values={"state_group": state_group_id, "event_id": event_id},
-        )