diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index f4a00b0736..547e43ab98 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -936,15 +936,46 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
# We want to make sure that we do a breadth-first, "depth" ordered
# search.
- query = (
- "SELECT depth, prev_event_id FROM event_edges"
- " INNER JOIN events"
- " ON prev_event_id = events.event_id"
- " WHERE event_edges.event_id = ?"
- " AND event_edges.is_state = ?"
- " LIMIT ?"
- )
+ # Look for the prev_event_id connected to the given event_id
+ query = """
+ SELECT depth, prev_event_id FROM event_edges
+ /* Get the depth of the prev_event_id from the events table */
+ INNER JOIN events
+ ON prev_event_id = events.event_id
+ /* Find an event which matches the given event_id */
+ WHERE event_edges.event_id = ?
+ AND event_edges.is_state = ?
+ LIMIT ?
+ """
+
+ # Look for the "insertion" events connected to the given event_id
+ connected_insertion_event_query = """
+ SELECT e.depth, i.event_id FROM insertion_event_edges AS i
+ /* Get the depth of the insertion event from the events table */
+ INNER JOIN events AS e USING (event_id)
+ /* Find an insertion event which points via prev_events to the given event_id */
+ WHERE i.insertion_prev_event_id = ?
+ LIMIT ?
+ """
+
+ # Find any chunk connections of a given insertion event
+ chunk_connection_query = """
+ SELECT e.depth, c.event_id FROM insertion_events AS i
+ /* Find the chunk that connects to the given insertion event */
+ INNER JOIN chunk_events AS c
+ ON i.next_chunk_id = c.chunk_id
+ /* Get the depth of the chunk start event from the events table */
+ INNER JOIN events AS e USING (event_id)
+ /* Find an insertion event which matches the given event_id */
+ WHERE i.event_id = ?
+ LIMIT ?
+ """
+ # In a PriorityQueue, the lowest valued entries are retrieved first.
+ # We're using depth as the priority in the queue.
+ # Depth is lowest at the oldest-in-time message and highest and
+ # newest-in-time message. We add events to the queue with a negative depth so that
+ # we process the newest-in-time messages first going backwards in time.
queue = PriorityQueue()
for event_id in event_list:
@@ -970,9 +1001,48 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
event_results.add(event_id)
+ # Try and find any potential historical chunks of message history.
+ #
+ # First we look for an insertion event connected to the current
+ # event (by prev_event). If we find any, we need to go and try to
+ # find any chunk events connected to the insertion event (by
+ # chunk_id). If we find any, we'll add them to the queue and
+ # navigate up the DAG like normal in the next iteration of the loop.
+ txn.execute(
+ connected_insertion_event_query, (event_id, limit - len(event_results))
+ )
+ connected_insertion_event_id_results = txn.fetchall()
+ logger.debug(
+ "_get_backfill_events: connected_insertion_event_query %s",
+ connected_insertion_event_id_results,
+ )
+ for row in connected_insertion_event_id_results:
+ connected_insertion_event_depth = row[0]
+ connected_insertion_event = row[1]
+ queue.put((-connected_insertion_event_depth, connected_insertion_event))
+
+ # Find any chunk connections for the given insertion event
+ txn.execute(
+ chunk_connection_query,
+ (connected_insertion_event, limit - len(event_results)),
+ )
+ chunk_start_event_id_results = txn.fetchall()
+ logger.debug(
+ "_get_backfill_events: chunk_start_event_id_results %s",
+ chunk_start_event_id_results,
+ )
+ for row in chunk_start_event_id_results:
+ if row[1] not in event_results:
+ queue.put((-row[0], row[1]))
+
+ # Navigate up the DAG by prev_event
txn.execute(query, (event_id, False, limit - len(event_results)))
+ prev_event_id_results = txn.fetchall()
+ logger.debug(
+ "_get_backfill_events: prev_event_ids %s", prev_event_id_results
+ )
- for row in txn:
+ for row in prev_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index a396a201d4..86baf397fb 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1502,6 +1502,9 @@ class PersistEventsStore:
self._handle_event_relations(txn, event)
+ self._handle_insertion_event(txn, event)
+ self._handle_chunk_event(txn, event)
+
# Store the labels for this event.
labels = event.content.get(EventContentFields.LABELS)
if labels:
@@ -1754,6 +1757,94 @@ class PersistEventsStore:
if rel_type == RelationTypes.REPLACE:
txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
+ def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase):
+ """Handles keeping track of insertion events and edges/connections.
+ Part of MSC2716.
+
+ Args:
+ txn: The database transaction object
+ event: The event to process
+ """
+
+ if event.type != EventTypes.MSC2716_INSERTION:
+ # Not a insertion event
+ return
+
+ # Skip processing a insertion event if the room version doesn't
+ # support it.
+ room_version = self.store.get_room_version_txn(txn, event.room_id)
+ if not room_version.msc2716_historical:
+ return
+
+ next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
+ if next_chunk_id is None:
+ # Invalid insertion event without next chunk ID
+ return
+
+ logger.debug(
+ "_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event
+ )
+
+ # Keep track of the insertion event and the chunk ID
+ self.db_pool.simple_insert_txn(
+ txn,
+ table="insertion_events",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "next_chunk_id": next_chunk_id,
+ },
+ )
+
+ # Insert an edge for every prev_event connection
+ for prev_event_id in event.prev_events:
+ self.db_pool.simple_insert_txn(
+ txn,
+ table="insertion_event_edges",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "insertion_prev_event_id": prev_event_id,
+ },
+ )
+
+ def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase):
+ """Handles inserting the chunk edges/connections between the chunk event
+ and an insertion event. Part of MSC2716.
+
+ Args:
+ txn: The database transaction object
+ event: The event to process
+ """
+
+ if event.type != EventTypes.MSC2716_CHUNK:
+ # Not a chunk event
+ return
+
+ # Skip processing a chunk event if the room version doesn't
+ # support it.
+ room_version = self.store.get_room_version_txn(txn, event.room_id)
+ if not room_version.msc2716_historical:
+ return
+
+ chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
+ if chunk_id is None:
+ # Invalid chunk event without a chunk ID
+ return
+
+ logger.debug("_handle_chunk_event chunk_id=%s %s", chunk_id, event)
+
+ # Keep track of the insertion event and the chunk ID
+ self.db_pool.simple_insert_txn(
+ txn,
+ table="chunk_events",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "chunk_id": chunk_id,
+ },
+ )
+
def _handle_redaction(self, txn, redacted_event_id):
"""Handles receiving a redaction and checking whether we need to remove
any redacted relations from the database.
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index 1757064a68..8e22da99ae 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -22,7 +22,7 @@ from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.events import EventBase
from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import DatabasePool
+from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.state import StateFilter
@@ -58,15 +58,32 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
async def get_room_version(self, room_id: str) -> RoomVersion:
"""Get the room_version of a given room
-
Raises:
NotFoundError: if the room is unknown
+ UnsupportedRoomVersionError: if the room uses an unknown room version.
+ Typically this happens if support for the room's version has been
+ removed from Synapse.
+ """
+ return await self.db_pool.runInteraction(
+ "get_room_version_txn",
+ self.get_room_version_txn,
+ room_id,
+ )
+ def get_room_version_txn(
+ self, txn: LoggingTransaction, room_id: str
+ ) -> RoomVersion:
+ """Get the room_version of a given room
+ Args:
+ txn: Transaction object
+ room_id: The room_id of the room you are trying to get the version for
+ Raises:
+ NotFoundError: if the room is unknown
UnsupportedRoomVersionError: if the room uses an unknown room version.
Typically this happens if support for the room's version has been
removed from Synapse.
"""
- room_version_id = await self.get_room_version_id(room_id)
+ room_version_id = self.get_room_version_id_txn(txn, room_id)
v = KNOWN_ROOM_VERSIONS.get(room_version_id)
if not v:
@@ -80,7 +97,20 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
@cached(max_entries=10000)
async def get_room_version_id(self, room_id: str) -> str:
"""Get the room_version of a given room
+ Raises:
+ NotFoundError: if the room is unknown
+ """
+ return await self.db_pool.runInteraction(
+ "get_room_version_id_txn",
+ self.get_room_version_id_txn,
+ room_id,
+ )
+ def get_room_version_id_txn(self, txn: LoggingTransaction, room_id: str) -> str:
+ """Get the room_version of a given room
+ Args:
+ txn: Transaction object
+ room_id: The room_id of the room you are trying to get the version for
Raises:
NotFoundError: if the room is unknown
"""
@@ -88,24 +118,22 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
# First we try looking up room version from the database, but for old
# rooms we might not have added the room version to it yet so we fall
# back to previous behaviour and look in current state events.
-
+ #
# We really should have an entry in the rooms table for every room we
# care about, but let's be a bit paranoid (at least while the background
# update is happening) to avoid breaking existing rooms.
- version = await self.db_pool.simple_select_one_onecol(
+ room_version = self.db_pool.simple_select_one_onecol_txn(
+ txn,
table="rooms",
keyvalues={"room_id": room_id},
retcol="room_version",
- desc="get_room_version",
allow_none=True,
)
- if version is not None:
- return version
+ if room_version is None:
+ raise NotFoundError("Could not room_version for %s" % (room_id,))
- # Retrieve the room's create event
- create_event = await self.get_create_event_for_room(room_id)
- return create_event.content.get("room_version", "1")
+ return room_version
async def get_room_predecessor(self, room_id: str) -> Optional[dict]:
"""Get the predecessor of an upgraded room if it exists.
diff --git a/synapse/storage/schema/main/delta/61/01insertion_event_lookups.sql b/synapse/storage/schema/main/delta/61/01insertion_event_lookups.sql
new file mode 100644
index 0000000000..7d7bafc631
--- /dev/null
+++ b/synapse/storage/schema/main/delta/61/01insertion_event_lookups.sql
@@ -0,0 +1,49 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Add a table that keeps track of "insertion" events and
+-- their next_chunk_id's so we can navigate to the next chunk of history.
+CREATE TABLE IF NOT EXISTS insertion_events(
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ next_chunk_id TEXT NOT NULL
+);
+CREATE UNIQUE INDEX IF NOT EXISTS insertion_events_event_id ON insertion_events(event_id);
+CREATE INDEX IF NOT EXISTS insertion_events_next_chunk_id ON insertion_events(next_chunk_id);
+
+-- Add a table that keeps track of all of the events we are inserting between.
+-- We use this when navigating the DAG and when we hit an event which matches
+-- `insertion_prev_event_id`, it should backfill from the "insertion" event and
+-- navigate the historical messages from there.
+CREATE TABLE IF NOT EXISTS insertion_event_edges(
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ insertion_prev_event_id TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS insertion_event_edges_event_id ON insertion_event_edges(event_id);
+CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_room_id ON insertion_event_edges(room_id);
+CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_prev_event_id ON insertion_event_edges(insertion_prev_event_id);
+
+-- Add a table that keeps track of how each chunk is labeled. The chunks are
+-- connected together based on an insertion events `next_chunk_id`.
+CREATE TABLE IF NOT EXISTS chunk_events(
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ chunk_id TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS chunk_events_event_id ON chunk_events(event_id);
+CREATE INDEX IF NOT EXISTS chunk_events_chunk_id ON chunk_events(chunk_id);
|