diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 0032a92f49..3a10c265c9 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -61,7 +61,6 @@ from .registration import RegistrationStore
from .rejections import RejectionsStore
from .relations import RelationsStore
from .room import RoomStore
-from .room_batch import RoomBatchStore
from .roommember import RoomMemberStore
from .search import SearchStore
from .session import SessionStore
@@ -87,7 +86,6 @@ class DataStore(
DeviceStore,
RoomMemberStore,
RoomStore,
- RoomBatchStore,
RegistrationStore,
ProfileStore,
PresenceStore,
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 2681917d0b..8b6e3c1dc7 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -31,7 +31,7 @@ from typing import (
import attr
from prometheus_client import Counter, Gauge
-from synapse.api.constants import MAX_DEPTH, EventTypes
+from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import StoreError
from synapse.api.room_versions import EventFormatVersions, RoomVersion
from synapse.events import EventBase, make_event_from_dict
@@ -891,124 +891,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
room_id,
)
- @trace
- async def get_insertion_event_backward_extremities_in_room(
- self,
- room_id: str,
- current_depth: int,
- limit: int,
- ) -> List[Tuple[str, int]]:
- """
- Get the insertion events we know about that we haven't backfilled yet
- along with the approximate depth. Only returns insertion events that are
- at a depth lower than or equal to the `current_depth`. Sorted by depth,
- highest to lowest (descending) so the closest events to the
- `current_depth` are first in the list.
-
- We ignore insertion events that are newer than the user's current scroll
- position (ie, those with depth greater than `current_depth`) as:
- 1. we don't really care about getting events that have happened
- after our current position; and
- 2. by the nature of paginating and scrolling back, we have likely
- previously tried and failed to backfill from that insertion event, so
- to avoid getting "stuck" requesting the same backfill repeatedly
- we drop those insertion event.
-
- Args:
- room_id: Room where we want to find the oldest events
- current_depth: The depth at the user's current scrollback position
- limit: The max number of insertion event extremities to return
-
- Returns:
- List of (event_id, depth) tuples. Sorted by depth, highest to lowest
- (descending) so the closest events to the `current_depth` are first
- in the list.
- """
-
- def get_insertion_event_backward_extremities_in_room_txn(
- txn: LoggingTransaction, room_id: str
- ) -> List[Tuple[str, int]]:
- if isinstance(self.database_engine, PostgresEngine):
- least_function = "LEAST"
- elif isinstance(self.database_engine, Sqlite3Engine):
- least_function = "MIN"
- else:
- raise RuntimeError("Unknown database engine")
-
- sql = f"""
- SELECT
- insertion_event_extremity.event_id, event.depth
- /* We only want insertion events that are also marked as backwards extremities */
- FROM insertion_event_extremities AS insertion_event_extremity
- /* Get the depth of the insertion event from the events table */
- INNER JOIN events AS event USING (event_id)
- /**
- * We use this info to make sure we don't retry to use a backfill point
- * if we've already attempted to backfill from it recently.
- */
- LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info
- ON
- failed_backfill_attempt_info.room_id = insertion_event_extremity.room_id
- AND failed_backfill_attempt_info.event_id = insertion_event_extremity.event_id
- WHERE
- insertion_event_extremity.room_id = ?
- /**
- * We only want extremities that are older than or at
- * the same position of the given `current_depth` (where older
- * means less than the given depth) because we're looking backwards
- * from the `current_depth` when backfilling.
- *
- * current_depth (ignore events that come after this, ignore 2-4)
- * |
- * ▼
- * <oldest-in-time> [0]<--[1]<--[2]<--[3]<--[4] <newest-in-time>
- */
- AND event.depth <= ? /* current_depth */
- /**
- * Exponential back-off (up to the upper bound) so we don't retry the
- * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc
- *
- * We use `1 << n` as a power of 2 equivalent for compatibility
- * with older SQLites. The left shift equivalent only works with
- * powers of 2 because left shift is a binary operation (base-2).
- * Otherwise, we would use `power(2, n)` or the power operator, `2^n`.
- */
- AND (
- failed_backfill_attempt_info.event_id IS NULL
- OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + (
- (1 << {least_function}(failed_backfill_attempt_info.num_attempts, ? /* max doubling steps */))
- * ? /* step */
- )
- )
- /**
- * Sort from highest (closest to the `current_depth`) to the lowest depth
- * because the closest are most relevant to backfill from first.
- * Then tie-break on alphabetical order of the event_ids so we get a
- * consistent ordering which is nice when asserting things in tests.
- */
- ORDER BY event.depth DESC, insertion_event_extremity.event_id DESC
- LIMIT ?
- """
-
- txn.execute(
- sql,
- (
- room_id,
- current_depth,
- self._clock.time_msec(),
- BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
- BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS,
- limit,
- ),
- )
- return cast(List[Tuple[str, int]], txn.fetchall())
-
- return await self.db_pool.runInteraction(
- "get_insertion_event_backward_extremities_in_room",
- get_insertion_event_backward_extremities_in_room_txn,
- room_id,
- )
-
async def get_max_depth_of(
self, event_ids: Collection[str]
) -> Tuple[Optional[str], int]:
@@ -1280,50 +1162,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
return event_ids
- def _get_connected_batch_event_backfill_results_txn(
- self, txn: LoggingTransaction, insertion_event_id: str, limit: int
- ) -> List[BackfillQueueNavigationItem]:
- """
- Find any batch connections of a given insertion event.
- A batch event points at a insertion event via:
- batch_event.content[MSC2716_BATCH_ID] -> insertion_event.content[MSC2716_NEXT_BATCH_ID]
-
- Args:
- txn: The database transaction to use
- insertion_event_id: The event ID to navigate from. We will find
- batch events that point back at this insertion event.
- limit: Max number of event ID's to query for and return
-
- Returns:
- List of batch events that the backfill queue can process
- """
- batch_connection_query = """
- SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i
- /* Find the batch that connects to the given insertion event */
- INNER JOIN batch_events AS c
- ON i.next_batch_id = c.batch_id
- /* Get the depth of the batch start event from the events table */
- INNER JOIN events AS e ON c.event_id = e.event_id
- /* Find an insertion event which matches the given event_id */
- WHERE i.event_id = ?
- LIMIT ?
- """
-
- # Find any batch connections for the given insertion event
- txn.execute(
- batch_connection_query,
- (insertion_event_id, limit),
- )
- return [
- BackfillQueueNavigationItem(
- depth=row[0],
- stream_ordering=row[1],
- event_id=row[2],
- type=row[3],
- )
- for row in txn
- ]
-
def _get_connected_prev_event_backfill_results_txn(
self, txn: LoggingTransaction, event_id: str, limit: int
) -> List[BackfillQueueNavigationItem]:
@@ -1472,40 +1310,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
event_id_results.add(event_id)
- # Try and find any potential historical batches of message history.
- if self.hs.config.experimental.msc2716_enabled:
- # We need to go and try to find any batch events connected
- # to a given insertion event (by batch_id). If we find any, we'll
- # add them to the queue and navigate up the DAG like normal in the
- # next iteration of the loop.
- if event_type == EventTypes.MSC2716_INSERTION:
- # Find any batch connections for the given insertion event
- connected_batch_event_backfill_results = (
- self._get_connected_batch_event_backfill_results_txn(
- txn, event_id, limit - len(event_id_results)
- )
- )
- logger.debug(
- "_get_backfill_events(room_id=%s): connected_batch_event_backfill_results=%s",
- room_id,
- connected_batch_event_backfill_results,
- )
- for (
- connected_batch_event_backfill_item
- ) in connected_batch_event_backfill_results:
- if (
- connected_batch_event_backfill_item.event_id
- not in event_id_results
- ):
- queue.put(
- (
- -connected_batch_event_backfill_item.depth,
- -connected_batch_event_backfill_item.stream_ordering,
- connected_batch_event_backfill_item.event_id,
- connected_batch_event_backfill_item.type,
- )
- )
-
# Now we just look up the DAG by prev_events as normal
connected_prev_event_backfill_results = (
self._get_connected_prev_event_backfill_results_txn(
@@ -1748,19 +1552,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
_delete_old_forward_extrem_cache_txn,
)
- @trace
- async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
- await self.db_pool.simple_upsert(
- table="insertion_event_extremities",
- keyvalues={"event_id": event_id},
- values={
- "event_id": event_id,
- "room_id": room_id,
- },
- insertion_values={},
- desc="insert_insertion_extremity",
- )
-
async def insert_received_event_to_staging(
self, origin: str, event: EventBase
) -> None:
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 44af3357af..5c9db7554e 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1664,9 +1664,6 @@ class PersistEventsStore:
self._handle_event_relations(txn, event)
- self._handle_insertion_event(txn, event)
- self._handle_batch_event(txn, event)
-
# Store the labels for this event.
labels = event.content.get(EventContentFields.LABELS)
if labels:
@@ -1927,128 +1924,6 @@ class PersistEventsStore:
),
)
- def _handle_insertion_event(
- self, txn: LoggingTransaction, event: EventBase
- ) -> None:
- """Handles keeping track of insertion events and edges/connections.
- Part of MSC2716.
-
- Args:
- txn: The database transaction object
- event: The event to process
- """
-
- if event.type != EventTypes.MSC2716_INSERTION:
- # Not a insertion event
- return
-
- # Skip processing an insertion event if the room version doesn't
- # support it or the event is not from the room creator.
- room_version = self.store.get_room_version_txn(txn, event.room_id)
- room_creator = self.db_pool.simple_select_one_onecol_txn(
- txn,
- table="rooms",
- keyvalues={"room_id": event.room_id},
- retcol="creator",
- allow_none=True,
- )
- if not room_version.msc2716_historical and (
- not self.hs.config.experimental.msc2716_enabled
- or event.sender != room_creator
- ):
- return
-
- next_batch_id = event.content.get(EventContentFields.MSC2716_NEXT_BATCH_ID)
- if next_batch_id is None:
- # Invalid insertion event without next batch ID
- return
-
- logger.debug(
- "_handle_insertion_event (next_batch_id=%s) %s", next_batch_id, event
- )
-
- # Keep track of the insertion event and the batch ID
- self.db_pool.simple_insert_txn(
- txn,
- table="insertion_events",
- values={
- "event_id": event.event_id,
- "room_id": event.room_id,
- "next_batch_id": next_batch_id,
- },
- )
-
- # Insert an edge for every prev_event connection
- for prev_event_id in event.prev_event_ids():
- self.db_pool.simple_insert_txn(
- txn,
- table="insertion_event_edges",
- values={
- "event_id": event.event_id,
- "room_id": event.room_id,
- "insertion_prev_event_id": prev_event_id,
- },
- )
-
- def _handle_batch_event(self, txn: LoggingTransaction, event: EventBase) -> None:
- """Handles inserting the batch edges/connections between the batch event
- and an insertion event. Part of MSC2716.
-
- Args:
- txn: The database transaction object
- event: The event to process
- """
-
- if event.type != EventTypes.MSC2716_BATCH:
- # Not a batch event
- return
-
- # Skip processing a batch event if the room version doesn't
- # support it or the event is not from the room creator.
- room_version = self.store.get_room_version_txn(txn, event.room_id)
- room_creator = self.db_pool.simple_select_one_onecol_txn(
- txn,
- table="rooms",
- keyvalues={"room_id": event.room_id},
- retcol="creator",
- allow_none=True,
- )
- if not room_version.msc2716_historical and (
- not self.hs.config.experimental.msc2716_enabled
- or event.sender != room_creator
- ):
- return
-
- batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID)
- if batch_id is None:
- # Invalid batch event without a batch ID
- return
-
- logger.debug("_handle_batch_event batch_id=%s %s", batch_id, event)
-
- # Keep track of the insertion event and the batch ID
- self.db_pool.simple_insert_txn(
- txn,
- table="batch_events",
- values={
- "event_id": event.event_id,
- "room_id": event.room_id,
- "batch_id": batch_id,
- },
- )
-
- # When we receive an event with a `batch_id` referencing the
- # `next_batch_id` of the insertion event, we can remove it from the
- # `insertion_event_extremities` table.
- sql = """
- DELETE FROM insertion_event_extremities WHERE event_id IN (
- SELECT event_id FROM insertion_events
- WHERE next_batch_id = ?
- )
- """
-
- txn.execute(sql, (batch_id,))
-
def _handle_redact_relations(
self, txn: LoggingTransaction, room_id: str, redacted_event_id: str
) -> None:
diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py
deleted file mode 100644
index 131f357d04..0000000000
--- a/synapse/storage/databases/main/room_batch.py
+++ /dev/null
@@ -1,47 +0,0 @@
-# Copyright 2021 The Matrix.org Foundation C.I.C.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from typing import Optional
-
-from synapse.storage._base import SQLBaseStore
-
-
-class RoomBatchStore(SQLBaseStore):
- async def get_insertion_event_id_by_batch_id(
- self, room_id: str, batch_id: str
- ) -> Optional[str]:
- """Retrieve a insertion event ID.
-
- Args:
- batch_id: The batch ID of the insertion event to retrieve.
-
- Returns:
- The event_id of an insertion event, or None if there is no known
- insertion event for the given insertion event.
- """
- return await self.db_pool.simple_select_one_onecol(
- table="insertion_events",
- keyvalues={"room_id": room_id, "next_batch_id": batch_id},
- retcol="event_id",
- allow_none=True,
- )
-
- async def store_state_group_id_for_event_id(
- self, event_id: str, state_group_id: int
- ) -> None:
- await self.db_pool.simple_upsert(
- table="event_to_state_groups",
- keyvalues={"event_id": event_id},
- values={"state_group": state_group_id, "event_id": event_id},
- )
|