summary refs log tree commit diff
path: root/synapse/storage/databases/main/event_federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/event_federation.py')
-rw-r--r--synapse/storage/databases/main/event_federation.py114
1 files changed, 98 insertions, 16 deletions
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 44018c1c31..bddf5ef192 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -671,27 +671,97 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         # Return all events where not all sets can reach them.
         return {eid for eid, n in event_to_missing_sets.items() if n}
 
-    async def get_oldest_events_with_depth_in_room(self, room_id):
+    async def get_oldest_event_ids_with_depth_in_room(self, room_id) -> Dict[str, int]:
+        """Gets the oldest events(backwards extremities) in the room along with the
+        aproximate depth.
+
+        We use this function so that we can compare and see if someones current
+        depth at their current scrollback is within pagination range of the
+        event extremeties. If the current depth is close to the depth of given
+        oldest event, we can trigger a backfill.
+
+        Args:
+            room_id: Room where we want to find the oldest events
+
+        Returns:
+            Map from event_id to depth
+        """
+
+        def get_oldest_event_ids_with_depth_in_room_txn(txn, room_id):
+            # Assemble a dictionary with event_id -> depth for the oldest events
+            # we know of in the room. Backwards extremeties are the oldest
+            # events we know of in the room but we only know of them because
+            # some other event referenced them by prev_event and aren't peristed
+            # in our database yet (meaning we don't know their depth
+            # specifically). So we need to look for the aproximate depth from
+            # the events connected to the current backwards extremeties.
+            sql = """
+                SELECT b.event_id, MAX(e.depth) FROM events as e
+                /**
+                 * Get the edge connections from the event_edges table
+                 * so we can see whether this event's prev_events points
+                 * to a backward extremity in the next join.
+                 */
+                INNER JOIN event_edges as g
+                ON g.event_id = e.event_id
+                /**
+                 * We find the "oldest" events in the room by looking for
+                 * events connected to backwards extremeties (oldest events
+                 * in the room that we know of so far).
+                 */
+                INNER JOIN event_backward_extremities as b
+                ON g.prev_event_id = b.event_id
+                WHERE b.room_id = ? AND g.is_state is ?
+                GROUP BY b.event_id
+            """
+
+            txn.execute(sql, (room_id, False))
+
+            return dict(txn)
+
         return await self.db_pool.runInteraction(
-            "get_oldest_events_with_depth_in_room",
-            self.get_oldest_events_with_depth_in_room_txn,
+            "get_oldest_event_ids_with_depth_in_room",
+            get_oldest_event_ids_with_depth_in_room_txn,
             room_id,
         )
 
-    def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):
-        sql = (
-            "SELECT b.event_id, MAX(e.depth) FROM events as e"
-            " INNER JOIN event_edges as g"
-            " ON g.event_id = e.event_id"
-            " INNER JOIN event_backward_extremities as b"
-            " ON g.prev_event_id = b.event_id"
-            " WHERE b.room_id = ? AND g.is_state is ?"
-            " GROUP BY b.event_id"
-        )
+    async def get_insertion_event_backwards_extremities_in_room(
+        self, room_id
+    ) -> Dict[str, int]:
+        """Get the insertion events we know about that we haven't backfilled yet.
 
-        txn.execute(sql, (room_id, False))
+        We use this function so that we can compare and see if someones current
+        depth at their current scrollback is within pagination range of the
+        insertion event. If the current depth is close to the depth of given
+        insertion event, we can trigger a backfill.
 
-        return dict(txn)
+        Args:
+            room_id: Room where we want to find the oldest events
+
+        Returns:
+            Map from event_id to depth
+        """
+
+        def get_insertion_event_backwards_extremities_in_room_txn(txn, room_id):
+            sql = """
+                SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
+                /* We only want insertion events that are also marked as backwards extremities */
+                INNER JOIN insertion_event_extremities as b USING (event_id)
+                /* Get the depth of the insertion event from the events table */
+                INNER JOIN events AS e USING (event_id)
+                WHERE b.room_id = ?
+                GROUP BY b.event_id
+            """
+
+            txn.execute(sql, (room_id,))
+
+            return dict(txn)
+
+        return await self.db_pool.runInteraction(
+            "get_insertion_event_backwards_extremities_in_room",
+            get_insertion_event_backwards_extremities_in_room_txn,
+            room_id,
+        )
 
     async def get_max_depth_of(self, event_ids: List[str]) -> Tuple[str, int]:
         """Returns the event ID and depth for the event that has the max depth from a set of event IDs
@@ -1041,7 +1111,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
                     if row[1] not in event_results:
                         queue.put((-row[0], row[1]))
 
-            # Navigate up the DAG by prev_event
             txn.execute(query, (event_id, False, limit - len(event_results)))
             prev_event_id_results = txn.fetchall()
             logger.debug(
@@ -1136,6 +1205,19 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             _delete_old_forward_extrem_cache_txn,
         )
 
+    async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
+        await self.db_pool.simple_upsert(
+            table="insertion_event_extremities",
+            keyvalues={"event_id": event_id},
+            values={
+                "event_id": event_id,
+                "room_id": room_id,
+            },
+            insertion_values={},
+            desc="insert_insertion_extremity",
+            lock=False,
+        )
+
     async def insert_received_event_to_staging(
         self, origin: str, event: EventBase
     ) -> None: