diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index eec55b6478..309a4ba664 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import datetime
import itertools
import logging
from queue import Empty, PriorityQueue
@@ -33,6 +34,7 @@ from synapse.api.constants import MAX_DEPTH, EventTypes
from synapse.api.errors import StoreError
from synapse.api.room_versions import EventFormatVersions, RoomVersion
from synapse.events import EventBase, make_event_from_dict
+from synapse.logging.opentracing import tag_args, trace
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
@@ -42,11 +44,12 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
-from synapse.storage.engines import PostgresEngine
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache
+from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
if TYPE_CHECKING:
@@ -70,6 +73,30 @@ pdus_pruned_from_federation_queue = Counter(
logger = logging.getLogger(__name__)
+# Parameters controlling exponential backoff between backfill failures.
+# After the first failure to backfill, we wait 2 hours before trying again. If the
+# second attempt fails, we wait 4 hours before trying again. If the third attempt fails,
+# we wait 8 hours before trying again, ... and so on.
+#
+# Each successive backoff period is twice as long as the last. However we cap this
+# period at a maximum of 2^8 = 256 hours: a little over 10 days. (This is the smallest
+# power of 2 which yields a maximum backoff period of at least 7 days---which was the
+# original maximum backoff period.) Even when we hit this cap, we will continue to
+# make backfill attempts once every 10 days.
+BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS = 8
+BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS = int(
+ datetime.timedelta(hours=1).total_seconds() * 1000
+)
+
+# We need a cap on the power of 2 or else the backoff period
+# 2^N * (milliseconds per hour)
+# will overflow when calcuated within the database. We ensure overflow does not occur
+# by checking that the largest backoff period fits in a 32-bit signed integer.
+_LONGEST_BACKOFF_PERIOD_MILLISECONDS = (
+ 2**BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS
+) * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS
+assert 0 < _LONGEST_BACKOFF_PERIOD_MILLISECONDS <= ((2**31) - 1)
+
# All the info we need while iterating the DAG while backfilling
@attr.s(frozen=True, slots=True, auto_attribs=True)
@@ -126,6 +153,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
)
return await self.get_events_as_list(event_ids)
+ @trace
+ @tag_args
async def get_auth_chain_ids(
self,
room_id: str,
@@ -709,95 +738,264 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
# Return all events where not all sets can reach them.
return {eid for eid, n in event_to_missing_sets.items() if n}
- async def get_oldest_event_ids_with_depth_in_room(
- self, room_id: str
+ @trace
+ @tag_args
+ async def get_backfill_points_in_room(
+ self,
+ room_id: str,
+ current_depth: int,
+ limit: int,
) -> List[Tuple[str, int]]:
- """Gets the oldest events(backwards extremities) in the room along with the
- aproximate depth.
-
- We use this function so that we can compare and see if someones current
- depth at their current scrollback is within pagination range of the
- event extremeties. If the current depth is close to the depth of given
- oldest event, we can trigger a backfill.
+ """
+ Get the backward extremities to backfill from in the room along with the
+ approximate depth.
+
+ Only returns events that are at a depth lower than or
+ equal to the `current_depth`. Sorted by depth, highest to lowest (descending)
+ so the closest events to the `current_depth` are first in the list.
+
+ We ignore extremities that are newer than the user's current scroll position
+ (ie, those with depth greater than `current_depth`) as:
+ 1. we don't really care about getting events that have happened
+ after our current position; and
+ 2. by the nature of paginating and scrolling back, we have likely
+ previously tried and failed to backfill from that extremity, so
+ to avoid getting "stuck" requesting the same backfill repeatedly
+ we drop those extremities.
Args:
room_id: Room where we want to find the oldest events
+ current_depth: The depth at the user's current scrollback position
+ limit: The max number of backfill points to return
Returns:
- List of (event_id, depth) tuples
+ List of (event_id, depth) tuples. Sorted by depth, highest to lowest
+ (descending) so the closest events to the `current_depth` are first
+ in the list.
"""
- def get_oldest_event_ids_with_depth_in_room_txn(
+ def get_backfill_points_in_room_txn(
txn: LoggingTransaction, room_id: str
) -> List[Tuple[str, int]]:
- # Assemble a dictionary with event_id -> depth for the oldest events
+ # Assemble a tuple lookup of event_id -> depth for the oldest events
# we know of in the room. Backwards extremeties are the oldest
# events we know of in the room but we only know of them because
- # some other event referenced them by prev_event and aren't peristed
- # in our database yet (meaning we don't know their depth
- # specifically). So we need to look for the aproximate depth from
+ # some other event referenced them by prev_event and aren't
+ # persisted in our database yet (meaning we don't know their depth
+ # specifically). So we need to look for the approximate depth from
# the events connected to the current backwards extremeties.
- sql = """
- SELECT b.event_id, MAX(e.depth) FROM events as e
+
+ if isinstance(self.database_engine, PostgresEngine):
+ least_function = "LEAST"
+ elif isinstance(self.database_engine, Sqlite3Engine):
+ least_function = "MIN"
+ else:
+ raise RuntimeError("Unknown database engine")
+
+ sql = f"""
+ SELECT backward_extrem.event_id, event.depth FROM events AS event
/**
* Get the edge connections from the event_edges table
* so we can see whether this event's prev_events points
* to a backward extremity in the next join.
*/
- INNER JOIN event_edges as g
- ON g.event_id = e.event_id
+ INNER JOIN event_edges AS edge
+ ON edge.event_id = event.event_id
/**
* We find the "oldest" events in the room by looking for
* events connected to backwards extremeties (oldest events
* in the room that we know of so far).
*/
- INNER JOIN event_backward_extremities as b
- ON g.prev_event_id = b.event_id
- WHERE b.room_id = ? AND g.is_state is ?
- GROUP BY b.event_id
+ INNER JOIN event_backward_extremities AS backward_extrem
+ ON edge.prev_event_id = backward_extrem.event_id
+ /**
+ * We use this info to make sure we don't retry to use a backfill point
+ * if we've already attempted to backfill from it recently.
+ */
+ LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info
+ ON
+ failed_backfill_attempt_info.room_id = backward_extrem.room_id
+ AND failed_backfill_attempt_info.event_id = backward_extrem.event_id
+ WHERE
+ backward_extrem.room_id = ?
+ /* We only care about non-state edges because we used to use
+ * `event_edges` for two different sorts of "edges" (the current
+ * event DAG, but also a link to the previous state, for state
+ * events). These legacy state event edges can be distinguished by
+ * `is_state` and are removed from the codebase and schema but
+ * because the schema change is in a background update, it's not
+ * necessarily safe to assume that it will have been completed.
+ */
+ AND edge.is_state is ? /* False */
+ /**
+ * We only want backwards extremities that are older than or at
+ * the same position of the given `current_depth` (where older
+ * means less than the given depth) because we're looking backwards
+ * from the `current_depth` when backfilling.
+ *
+ * current_depth (ignore events that come after this, ignore 2-4)
+ * |
+ * ▼
+ * <oldest-in-time> [0]<--[1]<--[2]<--[3]<--[4] <newest-in-time>
+ */
+ AND event.depth <= ? /* current_depth */
+ /**
+ * Exponential back-off (up to the upper bound) so we don't retry the
+ * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc.
+ *
+ * We use `1 << n` as a power of 2 equivalent for compatibility
+ * with older SQLites. The left shift equivalent only works with
+ * powers of 2 because left shift is a binary operation (base-2).
+ * Otherwise, we would use `power(2, n)` or the power operator, `2^n`.
+ */
+ AND (
+ failed_backfill_attempt_info.event_id IS NULL
+ OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + (
+ (1 << {least_function}(failed_backfill_attempt_info.num_attempts, ? /* max doubling steps */))
+ * ? /* step */
+ )
+ )
+ /**
+ * Sort from highest (closest to the `current_depth`) to the lowest depth
+ * because the closest are most relevant to backfill from first.
+ * Then tie-break on alphabetical order of the event_ids so we get a
+ * consistent ordering which is nice when asserting things in tests.
+ */
+ ORDER BY event.depth DESC, backward_extrem.event_id DESC
+ LIMIT ?
"""
- txn.execute(sql, (room_id, False))
+ txn.execute(
+ sql,
+ (
+ room_id,
+ False,
+ current_depth,
+ self._clock.time_msec(),
+ BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
+ BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS,
+ limit,
+ ),
+ )
return cast(List[Tuple[str, int]], txn.fetchall())
return await self.db_pool.runInteraction(
- "get_oldest_event_ids_with_depth_in_room",
- get_oldest_event_ids_with_depth_in_room_txn,
+ "get_backfill_points_in_room",
+ get_backfill_points_in_room_txn,
room_id,
)
+ @trace
async def get_insertion_event_backward_extremities_in_room(
- self, room_id: str
+ self,
+ room_id: str,
+ current_depth: int,
+ limit: int,
) -> List[Tuple[str, int]]:
- """Get the insertion events we know about that we haven't backfilled yet.
-
- We use this function so that we can compare and see if someones current
- depth at their current scrollback is within pagination range of the
- insertion event. If the current depth is close to the depth of given
- insertion event, we can trigger a backfill.
+ """
+ Get the insertion events we know about that we haven't backfilled yet
+ along with the approximate depth. Only returns insertion events that are
+ at a depth lower than or equal to the `current_depth`. Sorted by depth,
+ highest to lowest (descending) so the closest events to the
+ `current_depth` are first in the list.
+
+ We ignore insertion events that are newer than the user's current scroll
+ position (ie, those with depth greater than `current_depth`) as:
+ 1. we don't really care about getting events that have happened
+ after our current position; and
+ 2. by the nature of paginating and scrolling back, we have likely
+ previously tried and failed to backfill from that insertion event, so
+ to avoid getting "stuck" requesting the same backfill repeatedly
+ we drop those insertion event.
Args:
room_id: Room where we want to find the oldest events
+ current_depth: The depth at the user's current scrollback position
+ limit: The max number of insertion event extremities to return
Returns:
- List of (event_id, depth) tuples
+ List of (event_id, depth) tuples. Sorted by depth, highest to lowest
+ (descending) so the closest events to the `current_depth` are first
+ in the list.
"""
def get_insertion_event_backward_extremities_in_room_txn(
txn: LoggingTransaction, room_id: str
) -> List[Tuple[str, int]]:
- sql = """
- SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
+ if isinstance(self.database_engine, PostgresEngine):
+ least_function = "LEAST"
+ elif isinstance(self.database_engine, Sqlite3Engine):
+ least_function = "MIN"
+ else:
+ raise RuntimeError("Unknown database engine")
+
+ sql = f"""
+ SELECT
+ insertion_event_extremity.event_id, event.depth
/* We only want insertion events that are also marked as backwards extremities */
- INNER JOIN insertion_event_extremities as b USING (event_id)
+ FROM insertion_event_extremities AS insertion_event_extremity
/* Get the depth of the insertion event from the events table */
- INNER JOIN events AS e USING (event_id)
- WHERE b.room_id = ?
- GROUP BY b.event_id
+ INNER JOIN events AS event USING (event_id)
+ /**
+ * We use this info to make sure we don't retry to use a backfill point
+ * if we've already attempted to backfill from it recently.
+ */
+ LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info
+ ON
+ failed_backfill_attempt_info.room_id = insertion_event_extremity.room_id
+ AND failed_backfill_attempt_info.event_id = insertion_event_extremity.event_id
+ WHERE
+ insertion_event_extremity.room_id = ?
+ /**
+ * We only want extremities that are older than or at
+ * the same position of the given `current_depth` (where older
+ * means less than the given depth) because we're looking backwards
+ * from the `current_depth` when backfilling.
+ *
+ * current_depth (ignore events that come after this, ignore 2-4)
+ * |
+ * ▼
+ * <oldest-in-time> [0]<--[1]<--[2]<--[3]<--[4] <newest-in-time>
+ */
+ AND event.depth <= ? /* current_depth */
+ /**
+ * Exponential back-off (up to the upper bound) so we don't retry the
+ * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc
+ *
+ * We use `1 << n` as a power of 2 equivalent for compatibility
+ * with older SQLites. The left shift equivalent only works with
+ * powers of 2 because left shift is a binary operation (base-2).
+ * Otherwise, we would use `power(2, n)` or the power operator, `2^n`.
+ */
+ AND (
+ failed_backfill_attempt_info.event_id IS NULL
+ OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + (
+ (1 << {least_function}(failed_backfill_attempt_info.num_attempts, ? /* max doubling steps */))
+ * ? /* step */
+ )
+ )
+ /**
+ * Sort from highest (closest to the `current_depth`) to the lowest depth
+ * because the closest are most relevant to backfill from first.
+ * Then tie-break on alphabetical order of the event_ids so we get a
+ * consistent ordering which is nice when asserting things in tests.
+ */
+ ORDER BY event.depth DESC, insertion_event_extremity.event_id DESC
+ LIMIT ?
"""
- txn.execute(sql, (room_id,))
+ txn.execute(
+ sql,
+ (
+ room_id,
+ current_depth,
+ self._clock.time_msec(),
+ BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
+ BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS,
+ limit,
+ ),
+ )
return cast(List[Tuple[str, int]], txn.fetchall())
return await self.db_pool.runInteraction(
@@ -970,6 +1168,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
return int(min_depth) if min_depth is not None else None
+ @cancellable
async def get_forward_extremities_for_room_at_stream_ordering(
self, room_id: str, stream_ordering: int
) -> List[str]:
@@ -1286,6 +1485,105 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
return event_id_results
+ @trace
+ async def record_event_failed_pull_attempt(
+ self, room_id: str, event_id: str, cause: str
+ ) -> None:
+ """
+ Record when we fail to pull an event over federation.
+
+ This information allows us to be more intelligent when we decide to
+ retry (we don't need to fail over and over) and we can process that
+ event in the background so we don't block on it each time.
+
+ Args:
+ room_id: The room where the event failed to pull from
+ event_id: The event that failed to be fetched or processed
+ cause: The error message or reason that we failed to pull the event
+ """
+ logger.debug(
+ "record_event_failed_pull_attempt room_id=%s, event_id=%s, cause=%s",
+ room_id,
+ event_id,
+ cause,
+ )
+ await self.db_pool.runInteraction(
+ "record_event_failed_pull_attempt",
+ self._record_event_failed_pull_attempt_upsert_txn,
+ room_id,
+ event_id,
+ cause,
+ db_autocommit=True, # Safe as it's a single upsert
+ )
+
+ def _record_event_failed_pull_attempt_upsert_txn(
+ self,
+ txn: LoggingTransaction,
+ room_id: str,
+ event_id: str,
+ cause: str,
+ ) -> None:
+ sql = """
+ INSERT INTO event_failed_pull_attempts (
+ room_id, event_id, num_attempts, last_attempt_ts, last_cause
+ )
+ VALUES (?, ?, ?, ?, ?)
+ ON CONFLICT (room_id, event_id) DO UPDATE SET
+ num_attempts=event_failed_pull_attempts.num_attempts + 1,
+ last_attempt_ts=EXCLUDED.last_attempt_ts,
+ last_cause=EXCLUDED.last_cause;
+ """
+
+ txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))
+
+ @trace
+ async def get_event_ids_to_not_pull_from_backoff(
+ self,
+ room_id: str,
+ event_ids: Collection[str],
+ ) -> List[str]:
+ """
+ Filter down the events to ones that we've failed to pull before recently. Uses
+ exponential backoff.
+
+ Args:
+ room_id: The room that the events belong to
+ event_ids: A list of events to filter down
+
+ Returns:
+ List of event_ids that should not be attempted to be pulled
+ """
+ event_failed_pull_attempts = await self.db_pool.simple_select_many_batch(
+ table="event_failed_pull_attempts",
+ column="event_id",
+ iterable=event_ids,
+ keyvalues={},
+ retcols=(
+ "event_id",
+ "last_attempt_ts",
+ "num_attempts",
+ ),
+ desc="get_event_ids_to_not_pull_from_backoff",
+ )
+
+ current_time = self._clock.time_msec()
+ return [
+ event_failed_pull_attempt["event_id"]
+ for event_failed_pull_attempt in event_failed_pull_attempts
+ # Exponential back-off (up to the upper bound) so we don't try to
+ # pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc.
+ if current_time
+ < event_failed_pull_attempt["last_attempt_ts"]
+ + (
+ 2
+ ** min(
+ event_failed_pull_attempt["num_attempts"],
+ BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
+ )
+ )
+ * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS
+ ]
+
async def get_missing_events(
self,
room_id: str,
@@ -1339,6 +1637,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
event_results.reverse()
return event_results
+ @trace
+ @tag_args
async def get_successor_events(self, event_id: str) -> List[str]:
"""Fetch all events that have the given event as a prev event
@@ -1375,6 +1675,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
_delete_old_forward_extrem_cache_txn,
)
+ @trace
async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
await self.db_pool.simple_upsert(
table="insertion_event_extremities",
@@ -1483,7 +1784,12 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
self,
room_id: str,
) -> Optional[Tuple[str, str]]:
- """Get the next event ID in the staging area for the given room."""
+ """
+ Get the next event ID in the staging area for the given room.
+
+ Returns:
+ Tuple of the `origin` and `event_id`
+ """
def _get_next_staged_event_id_for_room_txn(
txn: LoggingTransaction,
@@ -1597,7 +1903,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
logger.info("Invalid prev_events for %s", event_id)
continue
- if room_version.event_format == EventFormatVersions.V1:
+ if room_version.event_format == EventFormatVersions.ROOM_V1_V2:
for prev_event_tuple in prev_events:
if (
not isinstance(prev_event_tuple, list)
|