summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/database.py14
-rw-r--r--synapse/storage/databases/main/event_federation.py114
-rw-r--r--synapse/storage/databases/main/events.py24
-rw-r--r--synapse/storage/schema/__init__.py2
-rw-r--r--synapse/storage/schema/main/delta/62/01insertion_event_extremities.sql24
5 files changed, 150 insertions, 28 deletions
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index c8015a3848..95d2caff62 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -941,13 +941,13 @@ class DatabasePool:
 
         `lock` should generally be set to True (the default), but can be set
         to False if either of the following are true:
-
-        * there is a UNIQUE INDEX on the key columns. In this case a conflict
-          will cause an IntegrityError in which case this function will retry
-          the update.
-
-        * we somehow know that we are the only thread which will be updating
-          this table.
+            1. there is a UNIQUE INDEX on the key columns. In this case a conflict
+            will cause an IntegrityError in which case this function will retry
+            the update.
+            2. we somehow know that we are the only thread which will be updating
+            this table.
+        As an additional note, this parameter only matters for old SQLite versions
+        because we will use native upserts otherwise.
 
         Args:
             table: The table to upsert into
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 44018c1c31..bddf5ef192 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -671,27 +671,97 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         # Return all events where not all sets can reach them.
         return {eid for eid, n in event_to_missing_sets.items() if n}
 
-    async def get_oldest_events_with_depth_in_room(self, room_id):
+    async def get_oldest_event_ids_with_depth_in_room(self, room_id) -> Dict[str, int]:
+        """Gets the oldest events(backwards extremities) in the room along with the
+        aproximate depth.
+
+        We use this function so that we can compare and see if someones current
+        depth at their current scrollback is within pagination range of the
+        event extremeties. If the current depth is close to the depth of given
+        oldest event, we can trigger a backfill.
+
+        Args:
+            room_id: Room where we want to find the oldest events
+
+        Returns:
+            Map from event_id to depth
+        """
+
+        def get_oldest_event_ids_with_depth_in_room_txn(txn, room_id):
+            # Assemble a dictionary with event_id -> depth for the oldest events
+            # we know of in the room. Backwards extremeties are the oldest
+            # events we know of in the room but we only know of them because
+            # some other event referenced them by prev_event and aren't peristed
+            # in our database yet (meaning we don't know their depth
+            # specifically). So we need to look for the aproximate depth from
+            # the events connected to the current backwards extremeties.
+            sql = """
+                SELECT b.event_id, MAX(e.depth) FROM events as e
+                /**
+                 * Get the edge connections from the event_edges table
+                 * so we can see whether this event's prev_events points
+                 * to a backward extremity in the next join.
+                 */
+                INNER JOIN event_edges as g
+                ON g.event_id = e.event_id
+                /**
+                 * We find the "oldest" events in the room by looking for
+                 * events connected to backwards extremeties (oldest events
+                 * in the room that we know of so far).
+                 */
+                INNER JOIN event_backward_extremities as b
+                ON g.prev_event_id = b.event_id
+                WHERE b.room_id = ? AND g.is_state is ?
+                GROUP BY b.event_id
+            """
+
+            txn.execute(sql, (room_id, False))
+
+            return dict(txn)
+
         return await self.db_pool.runInteraction(
-            "get_oldest_events_with_depth_in_room",
-            self.get_oldest_events_with_depth_in_room_txn,
+            "get_oldest_event_ids_with_depth_in_room",
+            get_oldest_event_ids_with_depth_in_room_txn,
             room_id,
         )
 
-    def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):
-        sql = (
-            "SELECT b.event_id, MAX(e.depth) FROM events as e"
-            " INNER JOIN event_edges as g"
-            " ON g.event_id = e.event_id"
-            " INNER JOIN event_backward_extremities as b"
-            " ON g.prev_event_id = b.event_id"
-            " WHERE b.room_id = ? AND g.is_state is ?"
-            " GROUP BY b.event_id"
-        )
+    async def get_insertion_event_backwards_extremities_in_room(
+        self, room_id
+    ) -> Dict[str, int]:
+        """Get the insertion events we know about that we haven't backfilled yet.
 
-        txn.execute(sql, (room_id, False))
+        We use this function so that we can compare and see if someones current
+        depth at their current scrollback is within pagination range of the
+        insertion event. If the current depth is close to the depth of given
+        insertion event, we can trigger a backfill.
 
-        return dict(txn)
+        Args:
+            room_id: Room where we want to find the oldest events
+
+        Returns:
+            Map from event_id to depth
+        """
+
+        def get_insertion_event_backwards_extremities_in_room_txn(txn, room_id):
+            sql = """
+                SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
+                /* We only want insertion events that are also marked as backwards extremities */
+                INNER JOIN insertion_event_extremities as b USING (event_id)
+                /* Get the depth of the insertion event from the events table */
+                INNER JOIN events AS e USING (event_id)
+                WHERE b.room_id = ?
+                GROUP BY b.event_id
+            """
+
+            txn.execute(sql, (room_id,))
+
+            return dict(txn)
+
+        return await self.db_pool.runInteraction(
+            "get_insertion_event_backwards_extremities_in_room",
+            get_insertion_event_backwards_extremities_in_room_txn,
+            room_id,
+        )
 
     async def get_max_depth_of(self, event_ids: List[str]) -> Tuple[str, int]:
         """Returns the event ID and depth for the event that has the max depth from a set of event IDs
@@ -1041,7 +1111,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
                     if row[1] not in event_results:
                         queue.put((-row[0], row[1]))
 
-            # Navigate up the DAG by prev_event
             txn.execute(query, (event_id, False, limit - len(event_results)))
             prev_event_id_results = txn.fetchall()
             logger.debug(
@@ -1136,6 +1205,19 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             _delete_old_forward_extrem_cache_txn,
         )
 
+    async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
+        await self.db_pool.simple_upsert(
+            table="insertion_event_extremities",
+            keyvalues={"event_id": event_id},
+            values={
+                "event_id": event_id,
+                "room_id": room_id,
+            },
+            insertion_values={},
+            desc="insert_insertion_extremity",
+            lock=False,
+        )
+
     async def insert_received_event_to_staging(
         self, origin: str, event: EventBase
     ) -> None:
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 86baf397fb..40b53274fb 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1845,6 +1845,18 @@ class PersistEventsStore:
             },
         )
 
+        # When we receive an event with a `chunk_id` referencing the
+        # `next_chunk_id` of the insertion event, we can remove it from the
+        # `insertion_event_extremities` table.
+        sql = """
+            DELETE FROM insertion_event_extremities WHERE event_id IN (
+                SELECT event_id FROM insertion_events
+                WHERE next_chunk_id = ?
+            )
+        """
+
+        txn.execute(sql, (chunk_id,))
+
     def _handle_redaction(self, txn, redacted_event_id):
         """Handles receiving a redaction and checking whether we need to remove
         any redacted relations from the database.
@@ -2101,15 +2113,17 @@ class PersistEventsStore:
 
         Forward extremities are handled when we first start persisting the events.
         """
+        # From the events passed in, add all of the prev events as backwards extremities.
+        # Ignore any events that are already backwards extrems or outliers.
         query = (
             "INSERT INTO event_backward_extremities (event_id, room_id)"
             " SELECT ?, ? WHERE NOT EXISTS ("
-            " SELECT 1 FROM event_backward_extremities"
-            " WHERE event_id = ? AND room_id = ?"
+            "   SELECT 1 FROM event_backward_extremities"
+            "   WHERE event_id = ? AND room_id = ?"
             " )"
             " AND NOT EXISTS ("
-            " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
-            " AND outlier = ?"
+            "   SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+            "   AND outlier = ?"
             " )"
         )
 
@@ -2123,6 +2137,8 @@ class PersistEventsStore:
             ],
         )
 
+        # Delete all these events that we've already fetched and now know that their
+        # prev events are the new backwards extremeties.
         query = (
             "DELETE FROM event_backward_extremities"
             " WHERE event_id = ? AND room_id = ?"
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 36340a652a..fd4dd67d91 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-SCHEMA_VERSION = 61
+SCHEMA_VERSION = 62
 """Represents the expectations made by the codebase about the database schema
 
 This should be incremented whenever the codebase changes its requirements on the
diff --git a/synapse/storage/schema/main/delta/62/01insertion_event_extremities.sql b/synapse/storage/schema/main/delta/62/01insertion_event_extremities.sql
new file mode 100644
index 0000000000..b731ef284a
--- /dev/null
+++ b/synapse/storage/schema/main/delta/62/01insertion_event_extremities.sql
@@ -0,0 +1,24 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- Add a table that keeps track of which "insertion" events need to be backfilled
+CREATE TABLE IF NOT EXISTS insertion_event_extremities(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS insertion_event_extremities_event_id ON insertion_event_extremities(event_id);
+CREATE INDEX IF NOT EXISTS insertion_event_extremities_room_id ON insertion_event_extremities(room_id);