diff options
author | Eric Eastwood <erice@element.io> | 2022-09-22 01:03:35 -0500 |
---|---|---|
committer | Eric Eastwood <erice@element.io> | 2022-09-22 01:03:35 -0500 |
commit | a25821d1600d48b2759a031505190a797047b92b (patch) | |
tree | da72b3d9a07dd363ef9047261de33c37c8d5ec1f /synapse | |
parent | Calculate the stream_ordering from newest -> oldest (in the correct order) an... (diff) | |
download | synapse-a25821d1600d48b2759a031505190a797047b92b.tar.xz |
Try sort backfill points by tie breaking on stream_ordering
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/federation.py | 110 | ||||
-rw-r--r-- | synapse/handlers/federation_event.py | 39 | ||||
-rw-r--r-- | synapse/storage/databases/main/event_federation.py | 183 |
3 files changed, 220 insertions, 112 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 377ac04f8c..223f343f1a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -38,7 +38,7 @@ from signedjson.sign import verify_signed_json from unpaddedbase64 import decode_base64 from synapse import event_auth -from synapse.api.constants import EventContentFields, EventTypes, Membership +from synapse.api.constants import MAX_DEPTH, EventContentFields, EventTypes, Membership from synapse.api.errors import ( AuthError, CodeMessageException, @@ -60,13 +60,7 @@ from synapse.events.validator import EventValidator from synapse.federation.federation_client import InvalidResponseError from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import nested_logging_context -from synapse.logging.tracing import ( - SynapseTags, - set_attribute, - start_active_span, - tag_args, - trace, -) +from synapse.logging.tracing import SynapseTags, set_attribute, tag_args, trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.module_api import NOT_SPAM from synapse.replication.http.federation import ( @@ -125,6 +119,7 @@ class _BackfillPoint: event_id: str depth: int + stream_ordering: int type: _BackfillPointType @@ -231,18 +226,24 @@ class FederationHandler: processing. Only used for timing. """ backwards_extremities = [ - _BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY) - for event_id, depth in await self.store.get_oldest_event_ids_with_depth_in_room( - room_id + _BackfillPoint( + event_id, depth, stream_ordering, _BackfillPointType.BACKWARDS_EXTREMITY + ) + for event_id, depth, stream_ordering in await self.store.get_backfill_points_in_room( + room_id=room_id, + current_depth=current_depth, ) ] insertion_events_to_be_backfilled: List[_BackfillPoint] = [] if self.hs.config.experimental.msc2716_enabled: insertion_events_to_be_backfilled = [ - _BackfillPoint(event_id, depth, _BackfillPointType.INSERTION_PONT) - for event_id, depth in await self.store.get_insertion_event_backward_extremities_in_room( - room_id + _BackfillPoint( + event_id, depth, stream_ordering, _BackfillPointType.INSERTION_PONT + ) + for event_id, depth, stream_ordering in await self.store.get_insertion_event_backward_extremities_in_room( + room_id=room_id, + current_depth=current_depth, ) ] logger.debug( @@ -251,10 +252,6 @@ class FederationHandler: insertion_events_to_be_backfilled, ) - if not backwards_extremities and not insertion_events_to_be_backfilled: - logger.debug("Not backfilling as no extremeties found.") - return False - # we now have a list of potential places to backpaginate from. We prefer to # start with the most recent (ie, max depth), so let's sort the list. sorted_backfill_points: List[_BackfillPoint] = sorted( @@ -262,7 +259,7 @@ class FederationHandler: backwards_extremities, insertion_events_to_be_backfilled, ), - key=lambda e: -int(e.depth), + key=lambda e: (-e.depth, -e.stream_ordering, e.event_id), ) logger.debug( @@ -275,6 +272,29 @@ class FederationHandler: sorted_backfill_points, ) + # If we have no backfill points lower than the `current_depth` then + # either we can a) bail or b) still attempt to backfill. We opt to try + # backfilling anyway just in case we do get relevant events. + if not sorted_backfill_points and current_depth != MAX_DEPTH: + logger.debug( + "_maybe_backfill_inner: all backfill points are *after* current depth. Backfilling anyway." + ) + return await self._maybe_backfill_inner( + room_id=room_id, + # We use `MAX_DEPTH` so that we find all backfill points next + # time (all events are below the `MAX_DEPTH`) + current_depth=MAX_DEPTH, + limit=limit, + processing_start_time=processing_start_time, + ) + elif not sorted_backfill_points and current_depth == MAX_DEPTH: + # Even after trying again with `MAX_DEPTH`, we didn't find any + # backward extremities to backfill from. + logger.debug( + "_maybe_backfill_inner: Not backfilling as no backward extremeties found." + ) + return False + # If we're approaching an extremity we trigger a backfill, otherwise we # no-op. # @@ -288,43 +308,16 @@ class FederationHandler: # XXX: shouldn't we do this *after* the filter by depth below? Again, we don't # care about events that have happened after our current position. # - max_depth = sorted_backfill_points[0].depth - if current_depth - 2 * limit > max_depth: + max_depth_of_backfill_points = sorted_backfill_points[0].depth + if current_depth - 2 * limit > max_depth_of_backfill_points: logger.debug( "Not backfilling as we don't need to. %d < %d - 2 * %d", - max_depth, + max_depth_of_backfill_points, current_depth, limit, ) return False - # We ignore extremities that have a greater depth than our current depth - # as: - # 1. we don't really care about getting events that have happened - # after our current position; and - # 2. we have likely previously tried and failed to backfill from that - # extremity, so to avoid getting "stuck" requesting the same - # backfill repeatedly we drop those extremities. - # - # However, we need to check that the filtered extremities are non-empty. - # If they are empty then either we can a) bail or b) still attempt to - # backfill. We opt to try backfilling anyway just in case we do get - # relevant events. - # - filtered_sorted_backfill_points = [ - t for t in sorted_backfill_points if t.depth <= current_depth - ] - if filtered_sorted_backfill_points: - logger.debug( - "_maybe_backfill_inner: backfill points before current depth: %s", - filtered_sorted_backfill_points, - ) - sorted_backfill_points = filtered_sorted_backfill_points - else: - logger.debug( - "_maybe_backfill_inner: all backfill points are *after* current depth. Backfilling anyway." - ) - # For performance's sake, we only want to paginate from a particular extremity # if we can actually see the events we'll get. Otherwise, we'd just spend a lot # of resources to get redacted events. We check each extremity in turn and @@ -346,14 +339,10 @@ class FederationHandler: # attempting to paginate before backfill reached the visible history. extremities_to_request: List[str] = [] - for i, bp in enumerate(sorted_backfill_points): + for bp in sorted_backfill_points: if len(extremities_to_request) >= 5: break - set_attribute( - SynapseTags.RESULT_PREFIX + "backfill_point" + str(i), str(bp) - ) - # For regular backwards extremities, we don't have the extremity events # themselves, so we need to actually check the events that reference them - # their "successor" events. @@ -410,13 +399,12 @@ class FederationHandler: str(len(extremities_to_request)), ) - with start_active_span("getting likely_domains"): - # Now we need to decide which hosts to hit first. - # First we try hosts that are already in the room. - # TODO: HEURISTIC ALERT. - likely_domains = ( - await self._storage_controllers.state.get_current_hosts_in_room(room_id) - ) + # Now we need to decide which hosts to hit first. + # First we try hosts that are already in the room. + # TODO: HEURISTIC ALERT. + likely_domains = ( + await self._storage_controllers.state.get_current_hosts_in_room(room_id) + ) async def try_backfill(domains: Collection[str]) -> bool: # TODO: Should we try multiple of these at a time? diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 40fbcce53a..5aaf2dd9e0 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -649,13 +649,31 @@ class FederationEventHandler: # # We expect the events from the `/backfill`response to start from # `?v` and include events that preceded it (so the list will be - # newest -> oldest). This is at-most a convention between Synapse - # servers as the order is not specced. + # newest -> oldest, reverse chronological). This is at-most a + # convention between Synapse servers as the order is not specced. # - # Reverse the list of events + # We want to calculate the `stream_ordering`` from newest -> oldest + # (so historical events sort in the correct order) and persist in + # oldest -> newest to get the least missing `prev_event` fetch + # thrashing. reverse_chronological_events = events + # `[::-1]` is just syntax to reverse the list and give us a copy chronological_events = reverse_chronological_events[::-1] + logger.info( + "backfill assumed reverse_chronological_events=%s", + [ + "event_id=%s,depth=%d,body=%s,prevs=%s\n" + % ( + event.event_id, + event.depth, + event.content.get("body", event.type), + event.prev_event_ids(), + ) + for event in reverse_chronological_events + ], + ) + from synapse.storage.util.id_generators import AbstractStreamIdGenerator # This should only exist on instances that are configured to write @@ -826,6 +844,21 @@ class FederationEventHandler: # We want to sort these by depth so we process them and # tell clients about them in order. sorted_events = sorted(events, key=lambda x: x.depth) + + logger.info( + "backfill sorted_events=%s", + [ + "event_id=%s,depth=%d,body=%s,prevs=%s\n" + % ( + event.event_id, + event.depth, + event.content.get("body", event.type), + event.prev_event_ids(), + ) + for event in sorted_events + ], + ) + for ev in sorted_events: with nested_logging_context(ev.event_id): await self._process_pulled_event(origin, ev, backfilled=backfilled) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 0669d54822..29c461d441 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import datetime import itertools import logging from queue import Empty, PriorityQueue @@ -43,7 +44,7 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.signatures import SignatureWorkerStore -from synapse.storage.engines import PostgresEngine +from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -72,7 +73,6 @@ pdus_pruned_from_federation_queue = Counter( logger = logging.getLogger(__name__) - # All the info we need while iterating the DAG while backfilling @attr.s(frozen=True, slots=True, auto_attribs=True) class BackfillQueueNavigationItem: @@ -715,97 +715,179 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas @trace @tag_args - async def get_oldest_event_ids_with_depth_in_room( - self, room_id: str - ) -> List[Tuple[str, int]]: - """Gets the oldest events(backwards extremities) in the room along with the - aproximate depth. - - We use this function so that we can compare and see if someones current - depth at their current scrollback is within pagination range of the - event extremeties. If the current depth is close to the depth of given - oldest event, we can trigger a backfill. + async def get_backfill_points_in_room( + self, + room_id: str, + current_depth: int, + ) -> List[Tuple[str, int, int]]: + """ + Gets the oldest events(backwards extremities) in the room along with the + approximate depth. Sorted by depth, highest to lowest (descending) so the closest + events to the `current_depth` are first in the list. + + We use this function so that we can compare and see if a client's + `current_depth` at their current scrollback is within pagination range + of the event extremities. If the `current_depth` is close to the depth + of given oldest event, we can trigger a backfill. + + We ignore extremities that have a greater depth than our `current_depth` + as: + 1. we don't really care about getting events that have happened + after our current position; and + 2. by the nature of paginating and scrolling back, we have likely + previously tried and failed to backfill from that extremity, so + to avoid getting "stuck" requesting the same backfill repeatedly + we drop those extremities. Args: room_id: Room where we want to find the oldest events + current_depth: The depth at the users current scrollback position + because we only care about finding events older than the given + `current_depth` when scrolling and paginating backwards. Returns: - List of (event_id, depth) tuples + List of (event_id, depth, stream_ordering) tuples. Sorted by depth, + highest to lowest (descending) so the closest events to the + `current_depth` are first in the list. Tie-broken with `stream_ordering`, + then `event_id` to get a stable sort. """ - def get_oldest_event_ids_with_depth_in_room_txn( + def get_backfill_points_in_room_txn( txn: LoggingTransaction, room_id: str - ) -> List[Tuple[str, int]]: - # Assemble a dictionary with event_id -> depth for the oldest events + ) -> List[Tuple[str, int, int]]: + # Assemble a tuple lookup of event_id -> depth for the oldest events # we know of in the room. Backwards extremeties are the oldest # events we know of in the room but we only know of them because - # some other event referenced them by prev_event and aren't peristed - # in our database yet (meaning we don't know their depth - # specifically). So we need to look for the aproximate depth from + # some other event referenced them by prev_event and aren't + # persisted in our database yet (meaning we don't know their depth + # specifically). So we need to look for the approximate depth from # the events connected to the current backwards extremeties. sql = """ - SELECT b.event_id, MAX(e.depth) FROM events as e + SELECT backward_extrem.event_id, event.depth, event.stream_ordering FROM events AS event /** * Get the edge connections from the event_edges table * so we can see whether this event's prev_events points * to a backward extremity in the next join. */ - INNER JOIN event_edges as g - ON g.event_id = e.event_id + INNER JOIN event_edges AS edge + ON edge.event_id = event.event_id /** * We find the "oldest" events in the room by looking for * events connected to backwards extremeties (oldest events * in the room that we know of so far). */ - INNER JOIN event_backward_extremities as b - ON g.prev_event_id = b.event_id - WHERE b.room_id = ? AND g.is_state is ? - GROUP BY b.event_id + INNER JOIN event_backward_extremities AS backward_extrem + ON edge.prev_event_id = backward_extrem.event_id + WHERE + backward_extrem.room_id = ? + /* We only care about non-state events because TODO: why */ + AND edge.is_state is ? /* False */ + /** + * We only want backwards extremities that are older than or at + * the same position of the given `current_depth` (where older + * means less than the given depth) because we're looking backwards + * from the `current_depth` when backfilling. + * + * current_depth (ignore events that come after this, ignore 2-4) + * | + * <oldest-in-time> [0]<--[1]▼<--[2]<--[3]<--[4] <newest-in-time> + */ + AND event.depth <= ? /* current_depth */ + /** + * Sort from highest (closest to the `current_depth`) to the lowest depth + * because the closest are most relevant to backfill from first. + * Then tie-break on alphabetical order of the event_ids so we get a + * consistent ordering which is nice when asserting things in tests. + */ + ORDER BY event.depth DESC, event.stream_ordering DESC, backward_extrem.event_id DESC """ - txn.execute(sql, (room_id, False)) + txn.execute( + sql, + ( + room_id, + False, + current_depth, + ), + ) - return cast(List[Tuple[str, int]], txn.fetchall()) + return cast(List[Tuple[str, int, int]], txn.fetchall()) return await self.db_pool.runInteraction( - "get_oldest_event_ids_with_depth_in_room", - get_oldest_event_ids_with_depth_in_room_txn, + "get_backfill_points_in_room", + get_backfill_points_in_room_txn, room_id, ) @trace async def get_insertion_event_backward_extremities_in_room( - self, room_id: str - ) -> List[Tuple[str, int]]: - """Get the insertion events we know about that we haven't backfilled yet. - - We use this function so that we can compare and see if someones current - depth at their current scrollback is within pagination range of the - insertion event. If the current depth is close to the depth of given - insertion event, we can trigger a backfill. + self, + room_id: str, + current_depth: int, + ) -> List[Tuple[str, int, int]]: + """ + Get the insertion events we know about that we haven't backfilled yet + along with the approximate depth. Sorted by depth, highest to lowest + (descending) so the closest events to the `current_depth` are first + in the list. + + We use this function so that we can compare and see if someones + `current_depth` at their current scrollback is within pagination range + of the insertion event. If the `current_depth` is close to the depth + of the given insertion event, we can trigger a backfill. + + We ignore insertion events that have a greater depth than our `current_depth` + as: + 1. we don't really care about getting events that have happened + after our current position; and + 2. by the nature of paginating and scrolling back, we have likely + previously tried and failed to backfill from that insertion event, so + to avoid getting "stuck" requesting the same backfill repeatedly + we drop those insertion event. Args: room_id: Room where we want to find the oldest events + current_depth: The depth at the users current scrollback position because + we only care about finding events older than the given + `current_depth` when scrolling and paginating backwards. Returns: - List of (event_id, depth) tuples + List of (event_id, depth, stream_ordering) tuples. Sorted by depth, + highest to lowest (descending) so the closest events to the + `current_depth` are first in the list. Tie-broken with `stream_ordering`, + then `event_id` to get a stable sort. """ def get_insertion_event_backward_extremities_in_room_txn( txn: LoggingTransaction, room_id: str - ) -> List[Tuple[str, int]]: + ) -> List[Tuple[str, int, int]]: sql = """ - SELECT b.event_id, MAX(e.depth) FROM insertion_events as i + SELECT + insertion_event_extremity.event_id, event.depth, event.stream_ordering /* We only want insertion events that are also marked as backwards extremities */ - INNER JOIN insertion_event_extremities as b USING (event_id) + FROM insertion_event_extremities AS insertion_event_extremity /* Get the depth of the insertion event from the events table */ - INNER JOIN events AS e USING (event_id) - WHERE b.room_id = ? - GROUP BY b.event_id + INNER JOIN events AS event USING (event_id) + WHERE + insertion_event_extremity.room_id = ? + AND event.depth <= ? /* current_depth */ + /** + * Sort from highest (closest to the `current_depth`) to the lowest depth + * because the closest are most relevant to backfill from first. + * Then tie-break on alphabetical order of the event_ids so we get a + * consistent ordering which is nice when asserting things in tests. + */ + ORDER BY event.depth DESC, event.stream_ordering DESC, insertion_event_extremity.event_id DESC """ - txn.execute(sql, (room_id,)) - return cast(List[Tuple[str, int]], txn.fetchall()) + txn.execute( + sql, + ( + room_id, + current_depth, + ), + ) + return cast(List[Tuple[str, int, int]], txn.fetchall()) return await self.db_pool.runInteraction( "get_insertion_event_backward_extremities_in_room", @@ -1539,7 +1621,12 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas self, room_id: str, ) -> Optional[Tuple[str, str]]: - """Get the next event ID in the staging area for the given room.""" + """ + Get the next event ID in the staging area for the given room. + + Returns: + Tuple of the `origin` and `event_id` + """ def _get_next_staged_event_id_for_room_txn( txn: LoggingTransaction, |