summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/event_federation.py188
1 files changed, 147 insertions, 41 deletions
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index ef477978ed..3251fca6fb 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -11,6 +11,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import datetime
 import itertools
 import logging
 from queue import Empty, PriorityQueue
@@ -43,7 +44,7 @@ from synapse.storage.database import (
 )
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.databases.main.signatures import SignatureWorkerStore
-from synapse.storage.engines import PostgresEngine
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.types import JsonDict
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached
@@ -72,6 +73,13 @@ pdus_pruned_from_federation_queue = Counter(
 
 logger = logging.getLogger(__name__)
 
+BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS: int = int(
+    datetime.timedelta(days=7).total_seconds()
+)
+BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS: int = int(
+    datetime.timedelta(hours=1).total_seconds()
+)
+
 
 # All the info we need while iterating the DAG while backfilling
 @attr.s(frozen=True, slots=True, auto_attribs=True)
@@ -715,96 +723,189 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
 
     @trace
     @tag_args
-    async def get_oldest_event_ids_with_depth_in_room(
-        self, room_id: str
+    async def get_backfill_points_in_room(
+        self,
+        room_id: str,
     ) -> List[Tuple[str, int]]:
-        """Gets the oldest events(backwards extremities) in the room along with the
-        aproximate depth.
-
-        We use this function so that we can compare and see if someones current
-        depth at their current scrollback is within pagination range of the
-        event extremeties. If the current depth is close to the depth of given
-        oldest event, we can trigger a backfill.
+        """
+        Gets the oldest events(backwards extremities) in the room along with the
+        approximate depth. Sorted by depth, highest to lowest (descending).
 
         Args:
             room_id: Room where we want to find the oldest events
 
         Returns:
-            List of (event_id, depth) tuples
+            List of (event_id, depth) tuples. Sorted by depth, highest to lowest
+            (descending)
         """
 
-        def get_oldest_event_ids_with_depth_in_room_txn(
+        def get_backfill_points_in_room_txn(
             txn: LoggingTransaction, room_id: str
         ) -> List[Tuple[str, int]]:
-            # Assemble a dictionary with event_id -> depth for the oldest events
+            # Assemble a tuple lookup of event_id -> depth for the oldest events
             # we know of in the room. Backwards extremeties are the oldest
             # events we know of in the room but we only know of them because
-            # some other event referenced them by prev_event and aren't peristed
-            # in our database yet (meaning we don't know their depth
-            # specifically). So we need to look for the aproximate depth from
+            # some other event referenced them by prev_event and aren't
+            # persisted in our database yet (meaning we don't know their depth
+            # specifically). So we need to look for the approximate depth from
             # the events connected to the current backwards extremeties.
             sql = """
-                SELECT b.event_id, MAX(e.depth) FROM events as e
+                SELECT backward_extrem.event_id, event.depth FROM events AS event
                 /**
                  * Get the edge connections from the event_edges table
                  * so we can see whether this event's prev_events points
                  * to a backward extremity in the next join.
                  */
-                INNER JOIN event_edges as g
-                ON g.event_id = e.event_id
+                INNER JOIN event_edges AS edge
+                ON edge.event_id = event.event_id
                 /**
                  * We find the "oldest" events in the room by looking for
                  * events connected to backwards extremeties (oldest events
                  * in the room that we know of so far).
                  */
-                INNER JOIN event_backward_extremities as b
-                ON g.prev_event_id = b.event_id
-                WHERE b.room_id = ? AND g.is_state is ?
-                GROUP BY b.event_id
+                INNER JOIN event_backward_extremities AS backward_extrem
+                ON edge.prev_event_id = backward_extrem.event_id
+                /**
+                 * We use this info to make sure we don't retry to use a backfill point
+                 * if we've already attempted to backfill from it recently.
+                 */
+                LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info
+                ON
+                    failed_backfill_attempt_info.room_id = backward_extrem.room_id
+                    AND failed_backfill_attempt_info.event_id = backward_extrem.event_id
+                WHERE
+                    backward_extrem.room_id = ?
+                    /* We only care about non-state edges because we used to use
+                     * `event_edges` for two different sorts of "edges" (the current
+                     * event DAG, but also a link to the previous state, for state
+                     * events). These legacy state event edges can be distinguished by
+                     * `is_state` and are removed from the codebase and schema but
+                     * because the schema change is in a background update, it's not
+                     * necessarily safe to assume that it will have been completed.
+                     */
+                    AND edge.is_state is ? /* False */
+                    /**
+                     * Exponential back-off (up to the upper bound) so we don't retry the
+                     * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc.
+                     *
+                     * We use `1 << n` as a power of 2 equivalent for compatibility
+                     * with older SQLites. The left shift equivalent only works with
+                     * powers of 2 because left shift is a binary operation (base-2).
+                     * Otherwise, we would use `power(2, n)` or the power operator, `2^n`.
+                     */
+                    AND (
+                        failed_backfill_attempt_info.event_id IS NULL
+                        OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */)
+                    )
+                /**
+                 * Sort from highest to the lowest depth. Then tie-break on
+                 * alphabetical order of the event_ids so we get a consistent
+                 * ordering which is nice when asserting things in tests.
+                 */
+                ORDER BY event.depth DESC, backward_extrem.event_id DESC
             """
 
-            txn.execute(sql, (room_id, False))
+            if isinstance(self.database_engine, PostgresEngine):
+                least_function = "least"
+            elif isinstance(self.database_engine, Sqlite3Engine):
+                least_function = "min"
+            else:
+                raise RuntimeError("Unknown database engine")
+
+            txn.execute(
+                sql % (least_function,),
+                (
+                    room_id,
+                    False,
+                    self._clock.time_msec(),
+                    1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
+                    1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
+                ),
+            )
 
             return cast(List[Tuple[str, int]], txn.fetchall())
 
         return await self.db_pool.runInteraction(
-            "get_oldest_event_ids_with_depth_in_room",
-            get_oldest_event_ids_with_depth_in_room_txn,
+            "get_backfill_points_in_room",
+            get_backfill_points_in_room_txn,
             room_id,
         )
 
     @trace
     async def get_insertion_event_backward_extremities_in_room(
-        self, room_id: str
+        self,
+        room_id: str,
     ) -> List[Tuple[str, int]]:
-        """Get the insertion events we know about that we haven't backfilled yet.
-
-        We use this function so that we can compare and see if someones current
-        depth at their current scrollback is within pagination range of the
-        insertion event. If the current depth is close to the depth of given
-        insertion event, we can trigger a backfill.
+        """
+        Get the insertion events we know about that we haven't backfilled yet
+        along with the approximate depth. Sorted by depth, highest to lowest
+        (descending).
 
         Args:
             room_id: Room where we want to find the oldest events
 
         Returns:
-            List of (event_id, depth) tuples
+            List of (event_id, depth) tuples. Sorted by depth, highest to lowest
+            (descending)
         """
 
         def get_insertion_event_backward_extremities_in_room_txn(
             txn: LoggingTransaction, room_id: str
         ) -> List[Tuple[str, int]]:
             sql = """
-                SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
+                SELECT
+                    insertion_event_extremity.event_id, event.depth
                 /* We only want insertion events that are also marked as backwards extremities */
-                INNER JOIN insertion_event_extremities as b USING (event_id)
+                FROM insertion_event_extremities AS insertion_event_extremity
                 /* Get the depth of the insertion event from the events table */
-                INNER JOIN events AS e USING (event_id)
-                WHERE b.room_id = ?
-                GROUP BY b.event_id
+                INNER JOIN events AS event USING (event_id)
+                /**
+                 * We use this info to make sure we don't retry to use a backfill point
+                 * if we've already attempted to backfill from it recently.
+                 */
+                LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info
+                ON
+                    failed_backfill_attempt_info.room_id = insertion_event_extremity.room_id
+                    AND failed_backfill_attempt_info.event_id = insertion_event_extremity.event_id
+                WHERE
+                    insertion_event_extremity.room_id = ?
+                    /**
+                     * Exponential back-off (up to the upper bound) so we don't retry the
+                     * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc
+                     *
+                     * We use `1 << n` as a power of 2 equivalent for compatibility
+                     * with older SQLites. The left shift equivalent only works with
+                     * powers of 2 because left shift is a binary operation (base-2).
+                     * Otherwise, we would use `power(2, n)` or the power operator, `2^n`.
+                     */
+                    AND (
+                        failed_backfill_attempt_info.event_id IS NULL
+                        OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */)
+                    )
+                /**
+                 * Sort from highest to the lowest depth. Then tie-break on
+                 * alphabetical order of the event_ids so we get a consistent
+                 * ordering which is nice when asserting things in tests.
+                 */
+                ORDER BY event.depth DESC, insertion_event_extremity.event_id DESC
             """
 
-            txn.execute(sql, (room_id,))
+            if isinstance(self.database_engine, PostgresEngine):
+                least_function = "least"
+            elif isinstance(self.database_engine, Sqlite3Engine):
+                least_function = "min"
+            else:
+                raise RuntimeError("Unknown database engine")
+
+            txn.execute(
+                sql % (least_function,),
+                (
+                    room_id,
+                    self._clock.time_msec(),
+                    1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
+                    1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
+                ),
+            )
             return cast(List[Tuple[str, int]], txn.fetchall())
 
         return await self.db_pool.runInteraction(
@@ -1539,7 +1640,12 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
         self,
         room_id: str,
     ) -> Optional[Tuple[str, str]]:
-        """Get the next event ID in the staging area for the given room."""
+        """
+        Get the next event ID in the staging area for the given room.
+
+        Returns:
+            Tuple of the `origin` and `event_id`
+        """
 
         def _get_next_staged_event_id_for_room_txn(
             txn: LoggingTransaction,