diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 583d5ecd77..e1a4265a64 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -226,9 +226,7 @@ class FederationHandler:
"""
backwards_extremities = [
_BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY)
- for event_id, depth in await self.store.get_oldest_event_ids_with_depth_in_room(
- room_id
- )
+ for event_id, depth in await self.store.get_backfill_points_in_room(room_id)
]
insertion_events_to_be_backfilled: List[_BackfillPoint] = []
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index ef477978ed..3251fca6fb 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import datetime
import itertools
import logging
from queue import Empty, PriorityQueue
@@ -43,7 +44,7 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
-from synapse.storage.engines import PostgresEngine
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
@@ -72,6 +73,13 @@ pdus_pruned_from_federation_queue = Counter(
logger = logging.getLogger(__name__)
+BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS: int = int(
+ datetime.timedelta(days=7).total_seconds()
+)
+BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS: int = int(
+ datetime.timedelta(hours=1).total_seconds()
+)
+
# All the info we need while iterating the DAG while backfilling
@attr.s(frozen=True, slots=True, auto_attribs=True)
@@ -715,96 +723,189 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
@trace
@tag_args
- async def get_oldest_event_ids_with_depth_in_room(
- self, room_id: str
+ async def get_backfill_points_in_room(
+ self,
+ room_id: str,
) -> List[Tuple[str, int]]:
- """Gets the oldest events(backwards extremities) in the room along with the
- aproximate depth.
-
- We use this function so that we can compare and see if someones current
- depth at their current scrollback is within pagination range of the
- event extremeties. If the current depth is close to the depth of given
- oldest event, we can trigger a backfill.
+ """
+ Gets the oldest events(backwards extremities) in the room along with the
+ approximate depth. Sorted by depth, highest to lowest (descending).
Args:
room_id: Room where we want to find the oldest events
Returns:
- List of (event_id, depth) tuples
+ List of (event_id, depth) tuples. Sorted by depth, highest to lowest
+ (descending)
"""
- def get_oldest_event_ids_with_depth_in_room_txn(
+ def get_backfill_points_in_room_txn(
txn: LoggingTransaction, room_id: str
) -> List[Tuple[str, int]]:
- # Assemble a dictionary with event_id -> depth for the oldest events
+ # Assemble a tuple lookup of event_id -> depth for the oldest events
# we know of in the room. Backwards extremeties are the oldest
# events we know of in the room but we only know of them because
- # some other event referenced them by prev_event and aren't peristed
- # in our database yet (meaning we don't know their depth
- # specifically). So we need to look for the aproximate depth from
+ # some other event referenced them by prev_event and aren't
+ # persisted in our database yet (meaning we don't know their depth
+ # specifically). So we need to look for the approximate depth from
# the events connected to the current backwards extremeties.
sql = """
- SELECT b.event_id, MAX(e.depth) FROM events as e
+ SELECT backward_extrem.event_id, event.depth FROM events AS event
/**
* Get the edge connections from the event_edges table
* so we can see whether this event's prev_events points
* to a backward extremity in the next join.
*/
- INNER JOIN event_edges as g
- ON g.event_id = e.event_id
+ INNER JOIN event_edges AS edge
+ ON edge.event_id = event.event_id
/**
* We find the "oldest" events in the room by looking for
* events connected to backwards extremeties (oldest events
* in the room that we know of so far).
*/
- INNER JOIN event_backward_extremities as b
- ON g.prev_event_id = b.event_id
- WHERE b.room_id = ? AND g.is_state is ?
- GROUP BY b.event_id
+ INNER JOIN event_backward_extremities AS backward_extrem
+ ON edge.prev_event_id = backward_extrem.event_id
+ /**
+ * We use this info to make sure we don't retry to use a backfill point
+ * if we've already attempted to backfill from it recently.
+ */
+ LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info
+ ON
+ failed_backfill_attempt_info.room_id = backward_extrem.room_id
+ AND failed_backfill_attempt_info.event_id = backward_extrem.event_id
+ WHERE
+ backward_extrem.room_id = ?
+ /* We only care about non-state edges because we used to use
+ * `event_edges` for two different sorts of "edges" (the current
+ * event DAG, but also a link to the previous state, for state
+ * events). These legacy state event edges can be distinguished by
+ * `is_state` and are removed from the codebase and schema but
+ * because the schema change is in a background update, it's not
+ * necessarily safe to assume that it will have been completed.
+ */
+ AND edge.is_state is ? /* False */
+ /**
+ * Exponential back-off (up to the upper bound) so we don't retry the
+ * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc.
+ *
+ * We use `1 << n` as a power of 2 equivalent for compatibility
+ * with older SQLites. The left shift equivalent only works with
+ * powers of 2 because left shift is a binary operation (base-2).
+ * Otherwise, we would use `power(2, n)` or the power operator, `2^n`.
+ */
+ AND (
+ failed_backfill_attempt_info.event_id IS NULL
+ OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */)
+ )
+ /**
+ * Sort from highest to the lowest depth. Then tie-break on
+ * alphabetical order of the event_ids so we get a consistent
+ * ordering which is nice when asserting things in tests.
+ */
+ ORDER BY event.depth DESC, backward_extrem.event_id DESC
"""
- txn.execute(sql, (room_id, False))
+ if isinstance(self.database_engine, PostgresEngine):
+ least_function = "least"
+ elif isinstance(self.database_engine, Sqlite3Engine):
+ least_function = "min"
+ else:
+ raise RuntimeError("Unknown database engine")
+
+ txn.execute(
+ sql % (least_function,),
+ (
+ room_id,
+ False,
+ self._clock.time_msec(),
+ 1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
+ 1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
+ ),
+ )
return cast(List[Tuple[str, int]], txn.fetchall())
return await self.db_pool.runInteraction(
- "get_oldest_event_ids_with_depth_in_room",
- get_oldest_event_ids_with_depth_in_room_txn,
+ "get_backfill_points_in_room",
+ get_backfill_points_in_room_txn,
room_id,
)
@trace
async def get_insertion_event_backward_extremities_in_room(
- self, room_id: str
+ self,
+ room_id: str,
) -> List[Tuple[str, int]]:
- """Get the insertion events we know about that we haven't backfilled yet.
-
- We use this function so that we can compare and see if someones current
- depth at their current scrollback is within pagination range of the
- insertion event. If the current depth is close to the depth of given
- insertion event, we can trigger a backfill.
+ """
+ Get the insertion events we know about that we haven't backfilled yet
+ along with the approximate depth. Sorted by depth, highest to lowest
+ (descending).
Args:
room_id: Room where we want to find the oldest events
Returns:
- List of (event_id, depth) tuples
+ List of (event_id, depth) tuples. Sorted by depth, highest to lowest
+ (descending)
"""
def get_insertion_event_backward_extremities_in_room_txn(
txn: LoggingTransaction, room_id: str
) -> List[Tuple[str, int]]:
sql = """
- SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
+ SELECT
+ insertion_event_extremity.event_id, event.depth
/* We only want insertion events that are also marked as backwards extremities */
- INNER JOIN insertion_event_extremities as b USING (event_id)
+ FROM insertion_event_extremities AS insertion_event_extremity
/* Get the depth of the insertion event from the events table */
- INNER JOIN events AS e USING (event_id)
- WHERE b.room_id = ?
- GROUP BY b.event_id
+ INNER JOIN events AS event USING (event_id)
+ /**
+ * We use this info to make sure we don't retry to use a backfill point
+ * if we've already attempted to backfill from it recently.
+ */
+ LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info
+ ON
+ failed_backfill_attempt_info.room_id = insertion_event_extremity.room_id
+ AND failed_backfill_attempt_info.event_id = insertion_event_extremity.event_id
+ WHERE
+ insertion_event_extremity.room_id = ?
+ /**
+ * Exponential back-off (up to the upper bound) so we don't retry the
+ * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc
+ *
+ * We use `1 << n` as a power of 2 equivalent for compatibility
+ * with older SQLites. The left shift equivalent only works with
+ * powers of 2 because left shift is a binary operation (base-2).
+ * Otherwise, we would use `power(2, n)` or the power operator, `2^n`.
+ */
+ AND (
+ failed_backfill_attempt_info.event_id IS NULL
+ OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */)
+ )
+ /**
+ * Sort from highest to the lowest depth. Then tie-break on
+ * alphabetical order of the event_ids so we get a consistent
+ * ordering which is nice when asserting things in tests.
+ */
+ ORDER BY event.depth DESC, insertion_event_extremity.event_id DESC
"""
- txn.execute(sql, (room_id,))
+ if isinstance(self.database_engine, PostgresEngine):
+ least_function = "least"
+ elif isinstance(self.database_engine, Sqlite3Engine):
+ least_function = "min"
+ else:
+ raise RuntimeError("Unknown database engine")
+
+ txn.execute(
+ sql % (least_function,),
+ (
+ room_id,
+ self._clock.time_msec(),
+ 1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
+ 1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
+ ),
+ )
return cast(List[Tuple[str, int]], txn.fetchall())
return await self.db_pool.runInteraction(
@@ -1539,7 +1640,12 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
self,
room_id: str,
) -> Optional[Tuple[str, str]]:
- """Get the next event ID in the staging area for the given room."""
+ """
+ Get the next event ID in the staging area for the given room.
+
+ Returns:
+ Tuple of the `origin` and `event_id`
+ """
def _get_next_staged_event_id_for_room_txn(
txn: LoggingTransaction,
|