summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2022-02-07 15:54:13 -0600
committerGitHub <noreply@github.com>2022-02-07 15:54:13 -0600
commitfef2e792beec9c126953b4f6b6d2d9f6e31ed96f (patch)
tree08b9a3f17413b80a4893c49ac0df23f35c64adcb /synapse
parentRemove optional state of `ApplicationService.is_interested`'s `store` paramet... (diff)
downloadsynapse-fef2e792beec9c126953b4f6b6d2d9f6e31ed96f.tar.xz
Fix historical messages backfilling in random order on remote homeservers (MSC2716) (#11114)
Fix https://github.com/matrix-org/synapse/issues/11091
Fix https://github.com/matrix-org/synapse/issues/10764 (side-stepping the issue because we no longer have to deal with `fake_prev_event_id`)

 1. Made the `/backfill` response return messages in `(depth, stream_ordering)` order (previously only sorted by `depth`)
    - Technically, it shouldn't really matter how `/backfill` returns things but I'm just trying to make the `stream_ordering` a little more consistent from the origin to the remote homeservers in order to get the order of messages from `/messages` consistent ([sorted by `(topological_ordering, stream_ordering)`](https://github.com/matrix-org/synapse/blob/develop/docs/development/room-dag-concepts.md#depth-and-stream-ordering)).
    - Even now that we return backfilled messages in order, it still doesn't guarantee the same `stream_ordering` (and more importantly the [`/messages` order](https://github.com/matrix-org/synapse/blob/develop/docs/development/room-dag-concepts.md#depth-and-stream-ordering)) on the other server. For example, if a room has a bunch of history imported and someone visits a permalink to a historical message back in time, their homeserver will skip over the historical messages in between and insert the permalink as the next message in the `stream_order` and totally throw off the sort.
       - This will be even more the case when we add the [MSC3030 jump to date API endpoint](https://github.com/matrix-org/matrix-doc/pull/3030) so the static archives can navigate and jump to a certain date.
       - We're solving this in the future by switching to [online topological ordering](https://github.com/matrix-org/gomatrixserverlib/issues/187) and [chunking](https://github.com/matrix-org/synapse/issues/3785) which by its nature will apply retroactively to fix any inconsistencies introduced by people permalinking
 2. As we're navigating `prev_events` to return in `/backfill`, we order by `depth` first (newest -> oldest) and now also tie-break based on the `stream_ordering` (newest -> oldest). This is technically important because MSC2716 inserts a bunch of historical messages at the same `depth` so it's best to be prescriptive about which ones we should process first. In reality, I think the code already looped over the historical messages as expected because the database is already in order.
 3. Making the historical state chain and historical event chain float on their own by having no `prev_events` instead of a fake `prev_event` which caused backfill to get clogged with an unresolvable event. Fixes https://github.com/matrix-org/synapse/issues/11091 and https://github.com/matrix-org/synapse/issues/10764
 4. We no longer find connected insertion events by finding a potential `prev_event` connection to the current event we're iterating over. We now solely rely on marker events which when processed, add the insertion event as an extremity and the federating homeserver can ask about it when time calls.
    - Related discussion, https://github.com/matrix-org/synapse/pull/11114#discussion_r741514793


Before | After
--- | ---
![](https://user-images.githubusercontent.com/558581/139218681-b465c862-5c49-4702-a59e-466733b0cf45.png) | ![](https://user-images.githubusercontent.com/558581/146453159-a1609e0a-8324-439d-ae44-e4bce43ac6d1.png)



#### Why aren't we sorting topologically when receiving backfill events?

> The main reason we're going to opt to not sort topologically when receiving backfill events is because it's probably best to do whatever is easiest to make it just work. People will probably have opinions once they look at [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) which could change whatever implementation anyway.
> 
> As mentioned, ideally we would do this but code necessary to make the fake edges but it gets confusing and gives an impression of “just whyyyy” (feels icky). This problem also dissolves with online topological ordering.
>
> -- https://github.com/matrix-org/synapse/pull/11114#discussion_r741517138

See https://github.com/matrix-org/synapse/pull/11114#discussion_r739610091 for the technical difficulties
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/federation.py27
-rw-r--r--synapse/handlers/federation_event.py34
-rw-r--r--synapse/handlers/message.py20
-rw-r--r--synapse/handlers/room_batch.py44
-rw-r--r--synapse/handlers/room_member.py22
-rw-r--r--synapse/rest/client/room_batch.py17
-rw-r--r--synapse/storage/databases/main/event_federation.py313
-rw-r--r--synapse/storage/databases/main/events.py13
8 files changed, 341 insertions, 149 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index a37ae0ca09..c0f642005f 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -166,9 +166,14 @@ class FederationHandler:
         oldest_events_with_depth = (
             await self.store.get_oldest_event_ids_with_depth_in_room(room_id)
         )
-        insertion_events_to_be_backfilled = (
-            await self.store.get_insertion_event_backwards_extremities_in_room(room_id)
-        )
+
+        insertion_events_to_be_backfilled: Dict[str, int] = {}
+        if self.hs.config.experimental.msc2716_enabled:
+            insertion_events_to_be_backfilled = (
+                await self.store.get_insertion_event_backward_extremities_in_room(
+                    room_id
+                )
+            )
         logger.debug(
             "_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
             oldest_events_with_depth,
@@ -271,11 +276,12 @@ class FederationHandler:
         ]
 
         logger.debug(
-            "room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems: %s filtered_sorted_extremeties_tuple: %s",
+            "room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems (%d): %s filtered_sorted_extremeties_tuple: %s",
             room_id,
             current_depth,
             limit,
             max_depth,
+            len(sorted_extremeties_tuple),
             sorted_extremeties_tuple,
             filtered_sorted_extremeties_tuple,
         )
@@ -1047,6 +1053,19 @@ class FederationHandler:
         limit = min(limit, 100)
 
         events = await self.store.get_backfill_events(room_id, pdu_list, limit)
+        logger.debug(
+            "on_backfill_request: backfill events=%s",
+            [
+                "event_id=%s,depth=%d,body=%s,prevs=%s\n"
+                % (
+                    event.event_id,
+                    event.depth,
+                    event.content.get("body", event.type),
+                    event.prev_event_ids(),
+                )
+                for event in events
+            ],
+        )
 
         events = await filter_events_for_server(self.storage, origin, events)
 
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 3905f60b3a..9edc7369d6 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -508,7 +508,11 @@ class FederationEventHandler:
                     f"room {ev.room_id}, when we were backfilling in {room_id}"
                 )
 
-        await self._process_pulled_events(dest, events, backfilled=True)
+        await self._process_pulled_events(
+            dest,
+            events,
+            backfilled=True,
+        )
 
     async def _get_missing_events_for_pdu(
         self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
@@ -626,11 +630,24 @@ class FederationEventHandler:
             backfilled: True if this is part of a historical batch of events (inhibits
                 notification to clients, and validation of device keys.)
         """
+        logger.debug(
+            "processing pulled backfilled=%s events=%s",
+            backfilled,
+            [
+                "event_id=%s,depth=%d,body=%s,prevs=%s\n"
+                % (
+                    event.event_id,
+                    event.depth,
+                    event.content.get("body", event.type),
+                    event.prev_event_ids(),
+                )
+                for event in events
+            ],
+        )
 
         # We want to sort these by depth so we process them and
         # tell clients about them in order.
         sorted_events = sorted(events, key=lambda x: x.depth)
-
         for ev in sorted_events:
             with nested_logging_context(ev.event_id):
                 await self._process_pulled_event(origin, ev, backfilled=backfilled)
@@ -992,6 +1009,8 @@ class FederationEventHandler:
 
         await self._run_push_actions_and_persist_event(event, context, backfilled)
 
+        await self._handle_marker_event(origin, event)
+
         if backfilled or context.rejected:
             return
 
@@ -1071,8 +1090,6 @@ class FederationEventHandler:
                     event.sender,
                 )
 
-        await self._handle_marker_event(origin, event)
-
     async def _resync_device(self, sender: str) -> None:
         """We have detected that the device list for the given user may be out
         of sync, so we try and resync them.
@@ -1323,7 +1340,14 @@ class FederationEventHandler:
             return event, context
 
         events_to_persist = (x for x in (prep(event) for event in fetched_events) if x)
-        await self.persist_events_and_notify(room_id, tuple(events_to_persist))
+        await self.persist_events_and_notify(
+            room_id,
+            tuple(events_to_persist),
+            # Mark these events backfilled as they're historic events that will
+            # eventually be backfilled. For example, missing events we fetch
+            # during backfill should be marked as backfilled as well.
+            backfilled=True,
+        )
 
     async def _check_event_auth(
         self,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index b37250aa38..9267e586a8 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -490,12 +490,12 @@ class EventCreationHandler:
         requester: Requester,
         event_dict: dict,
         txn_id: Optional[str] = None,
+        allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
         require_consent: bool = True,
         outlier: bool = False,
         historical: bool = False,
-        allow_no_prev_events: bool = False,
         depth: Optional[int] = None,
     ) -> Tuple[EventBase, EventContext]:
         """
@@ -510,6 +510,10 @@ class EventCreationHandler:
             requester
             event_dict: An entire event
             txn_id
+            allow_no_prev_events: Whether to allow this event to be created an empty
+                list of prev_events. Normally this is prohibited just because most
+                events should have a prev_event and we should only use this in special
+                cases like MSC2716.
             prev_event_ids:
                 the forward extremities to use as the prev_events for the
                 new event.
@@ -604,10 +608,10 @@ class EventCreationHandler:
         event, context = await self.create_new_client_event(
             builder=builder,
             requester=requester,
+            allow_no_prev_events=allow_no_prev_events,
             prev_event_ids=prev_event_ids,
             auth_event_ids=auth_event_ids,
             depth=depth,
-            allow_no_prev_events=allow_no_prev_events,
         )
 
         # In an ideal world we wouldn't need the second part of this condition. However,
@@ -764,6 +768,7 @@ class EventCreationHandler:
         self,
         requester: Requester,
         event_dict: dict,
+        allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
         ratelimit: bool = True,
@@ -781,6 +786,10 @@ class EventCreationHandler:
         Args:
             requester: The requester sending the event.
             event_dict: An entire event.
+            allow_no_prev_events: Whether to allow this event to be created an empty
+                list of prev_events. Normally this is prohibited just because most
+                events should have a prev_event and we should only use this in special
+                cases like MSC2716.
             prev_event_ids:
                 The event IDs to use as the prev events.
                 Should normally be left as None to automatically request them
@@ -880,16 +889,20 @@ class EventCreationHandler:
         self,
         builder: EventBuilder,
         requester: Optional[Requester] = None,
+        allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
         depth: Optional[int] = None,
-        allow_no_prev_events: bool = False,
     ) -> Tuple[EventBase, EventContext]:
         """Create a new event for a local client
 
         Args:
             builder:
             requester:
+            allow_no_prev_events: Whether to allow this event to be created an empty
+                list of prev_events. Normally this is prohibited just because most
+                events should have a prev_event and we should only use this in special
+                cases like MSC2716.
             prev_event_ids:
                 the forward extremities to use as the prev_events for the
                 new event.
@@ -908,7 +921,6 @@ class EventCreationHandler:
         Returns:
             Tuple of created event, context
         """
-
         # Strip down the auth_event_ids to only what we need to auth the event.
         # For example, we don't need extra m.room.member that don't match event.sender
         full_state_ids_at_event = None
diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py
index f880aa93d2..f8137ec04c 100644
--- a/synapse/handlers/room_batch.py
+++ b/synapse/handlers/room_batch.py
@@ -13,10 +13,6 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
-def generate_fake_event_id() -> str:
-    return "$fake_" + random_string(43)
-
-
 class RoomBatchHandler:
     def __init__(self, hs: "HomeServer"):
         self.hs = hs
@@ -182,11 +178,12 @@ class RoomBatchHandler:
         state_event_ids_at_start = []
         auth_event_ids = initial_auth_event_ids.copy()
 
-        # Make the state events float off on their own so we don't have a
-        # bunch of `@mxid joined the room` noise between each batch
-        prev_event_id_for_state_chain = generate_fake_event_id()
+        # Make the state events float off on their own by specifying no
+        # prev_events for the first one in the chain so we don't have a bunch of
+        # `@mxid joined the room` noise between each batch.
+        prev_event_ids_for_state_chain: List[str] = []
 
-        for state_event in state_events_at_start:
+        for index, state_event in enumerate(state_events_at_start):
             assert_params_in_dict(
                 state_event, ["type", "origin_server_ts", "content", "sender"]
             )
@@ -222,7 +219,10 @@ class RoomBatchHandler:
                     content=event_dict["content"],
                     outlier=True,
                     historical=True,
-                    prev_event_ids=[prev_event_id_for_state_chain],
+                    # Only the first event in the chain should be floating.
+                    # The rest should hang off each other in a chain.
+                    allow_no_prev_events=index == 0,
+                    prev_event_ids=prev_event_ids_for_state_chain,
                     # Make sure to use a copy of this list because we modify it
                     # later in the loop here. Otherwise it will be the same
                     # reference and also update in the event when we append later.
@@ -242,7 +242,10 @@ class RoomBatchHandler:
                     event_dict,
                     outlier=True,
                     historical=True,
-                    prev_event_ids=[prev_event_id_for_state_chain],
+                    # Only the first event in the chain should be floating.
+                    # The rest should hang off each other in a chain.
+                    allow_no_prev_events=index == 0,
+                    prev_event_ids=prev_event_ids_for_state_chain,
                     # Make sure to use a copy of this list because we modify it
                     # later in the loop here. Otherwise it will be the same
                     # reference and also update in the event when we append later.
@@ -253,7 +256,7 @@ class RoomBatchHandler:
             state_event_ids_at_start.append(event_id)
             auth_event_ids.append(event_id)
             # Connect all the state in a floating chain
-            prev_event_id_for_state_chain = event_id
+            prev_event_ids_for_state_chain = [event_id]
 
         return state_event_ids_at_start
 
@@ -261,7 +264,6 @@ class RoomBatchHandler:
         self,
         events_to_create: List[JsonDict],
         room_id: str,
-        initial_prev_event_ids: List[str],
         inherited_depth: int,
         auth_event_ids: List[str],
         app_service_requester: Requester,
@@ -277,9 +279,6 @@ class RoomBatchHandler:
             events_to_create: List of historical events to create in JSON
                 dictionary format.
             room_id: Room where you want the events persisted in.
-            initial_prev_event_ids: These will be the prev_events for the first
-                event created. Each event created afterwards will point to the
-                previous event created.
             inherited_depth: The depth to create the events at (you will
                 probably by calling inherit_depth_from_prev_ids(...)).
             auth_event_ids: Define which events allow you to create the given
@@ -291,11 +290,14 @@ class RoomBatchHandler:
         """
         assert app_service_requester.app_service
 
-        prev_event_ids = initial_prev_event_ids.copy()
+        # Make the historical event chain float off on its own by specifying no
+        # prev_events for the first event in the chain which causes the HS to
+        # ask for the state at the start of the batch later.
+        prev_event_ids: List[str] = []
 
         event_ids = []
         events_to_persist = []
-        for ev in events_to_create:
+        for index, ev in enumerate(events_to_create):
             assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"])
 
             assert self.hs.is_mine_id(ev["sender"]), "User must be our own: %s" % (
@@ -319,6 +321,9 @@ class RoomBatchHandler:
                     ev["sender"], app_service_requester.app_service
                 ),
                 event_dict,
+                # Only the first event in the chain should be floating.
+                # The rest should hang off each other in a chain.
+                allow_no_prev_events=index == 0,
                 prev_event_ids=event_dict.get("prev_events"),
                 auth_event_ids=auth_event_ids,
                 historical=True,
@@ -370,7 +375,6 @@ class RoomBatchHandler:
         events_to_create: List[JsonDict],
         room_id: str,
         batch_id_to_connect_to: str,
-        initial_prev_event_ids: List[str],
         inherited_depth: int,
         auth_event_ids: List[str],
         app_service_requester: Requester,
@@ -385,9 +389,6 @@ class RoomBatchHandler:
             room_id: Room where you want the events created in.
             batch_id_to_connect_to: The batch_id from the insertion event you
                 want this batch to connect to.
-            initial_prev_event_ids: These will be the prev_events for the first
-                event created. Each event created afterwards will point to the
-                previous event created.
             inherited_depth: The depth to create the events at (you will
                 probably by calling inherit_depth_from_prev_ids(...)).
             auth_event_ids: Define which events allow you to create the given
@@ -436,7 +437,6 @@ class RoomBatchHandler:
         event_ids = await self.persist_historical_events(
             events_to_create=events_to_create,
             room_id=room_id,
-            initial_prev_event_ids=initial_prev_event_ids,
             inherited_depth=inherited_depth,
             auth_event_ids=auth_event_ids,
             app_service_requester=app_service_requester,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index efe6b4c9aa..bf1a47efb0 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -268,7 +268,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         target: UserID,
         room_id: str,
         membership: str,
-        prev_event_ids: List[str],
+        allow_no_prev_events: bool = False,
+        prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
         txn_id: Optional[str] = None,
         ratelimit: bool = True,
@@ -286,8 +287,12 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             target:
             room_id:
             membership:
-            prev_event_ids: The event IDs to use as the prev events
 
+            allow_no_prev_events: Whether to allow this event to be created an empty
+                list of prev_events. Normally this is prohibited just because most
+                events should have a prev_event and we should only use this in special
+                cases like MSC2716.
+            prev_event_ids: The event IDs to use as the prev events
             auth_event_ids:
                 The event ids to use as the auth_events for the new event.
                 Should normally be left as None, which will cause them to be calculated
@@ -344,6 +349,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 "membership": membership,
             },
             txn_id=txn_id,
+            allow_no_prev_events=allow_no_prev_events,
             prev_event_ids=prev_event_ids,
             auth_event_ids=auth_event_ids,
             require_consent=require_consent,
@@ -446,6 +452,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         require_consent: bool = True,
         outlier: bool = False,
         historical: bool = False,
+        allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
     ) -> Tuple[str, int]:
@@ -470,6 +477,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             historical: Indicates whether the message is being inserted
                 back in time around some existing events. This is used to skip
                 a few checks and mark the event as backfilled.
+            allow_no_prev_events: Whether to allow this event to be created an empty
+                list of prev_events. Normally this is prohibited just because most
+                events should have a prev_event and we should only use this in special
+                cases like MSC2716.
             prev_event_ids: The event IDs to use as the prev events
             auth_event_ids:
                 The event ids to use as the auth_events for the new event.
@@ -504,6 +515,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 require_consent=require_consent,
                 outlier=outlier,
                 historical=historical,
+                allow_no_prev_events=allow_no_prev_events,
                 prev_event_ids=prev_event_ids,
                 auth_event_ids=auth_event_ids,
             )
@@ -525,6 +537,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         require_consent: bool = True,
         outlier: bool = False,
         historical: bool = False,
+        allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
     ) -> Tuple[str, int]:
@@ -551,6 +564,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             historical: Indicates whether the message is being inserted
                 back in time around some existing events. This is used to skip
                 a few checks and mark the event as backfilled.
+            allow_no_prev_events: Whether to allow this event to be created an empty
+                list of prev_events. Normally this is prohibited just because most
+                events should have a prev_event and we should only use this in special
+                cases like MSC2716.
             prev_event_ids: The event IDs to use as the prev events
             auth_event_ids:
                 The event ids to use as the auth_events for the new event.
@@ -680,6 +697,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 membership=effective_membership_state,
                 txn_id=txn_id,
                 ratelimit=ratelimit,
+                allow_no_prev_events=allow_no_prev_events,
                 prev_event_ids=prev_event_ids,
                 auth_event_ids=auth_event_ids,
                 content=content,
diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py
index e4c9451ae0..4b6be38327 100644
--- a/synapse/rest/client/room_batch.py
+++ b/synapse/rest/client/room_batch.py
@@ -131,6 +131,14 @@ class RoomBatchSendEventRestServlet(RestServlet):
             prev_event_ids_from_query
         )
 
+        if not auth_event_ids:
+            raise SynapseError(
+                HTTPStatus.BAD_REQUEST,
+                "No auth events found for given prev_event query parameter. The prev_event=%s probably does not exist."
+                % prev_event_ids_from_query,
+                errcode=Codes.INVALID_PARAM,
+            )
+
         state_event_ids_at_start = []
         # Create and persist all of the state events that float off on their own
         # before the batch. These will most likely be all of the invite/member
@@ -197,21 +205,12 @@ class RoomBatchSendEventRestServlet(RestServlet):
                 EventContentFields.MSC2716_NEXT_BATCH_ID
             ]
 
-        # Also connect the historical event chain to the end of the floating
-        # state chain, which causes the HS to ask for the state at the start of
-        # the batch later. If there is no state chain to connect to, just make
-        # the insertion event float itself.
-        prev_event_ids = []
-        if len(state_event_ids_at_start):
-            prev_event_ids = [state_event_ids_at_start[-1]]
-
         # Create and persist all of the historical events as well as insertion
         # and batch meta events to make the batch navigable in the DAG.
         event_ids, next_batch_id = await self.room_batch_handler.handle_batch_of_events(
             events_to_create=events_to_create,
             room_id=room_id,
             batch_id_to_connect_to=batch_id_to_connect_to,
-            initial_prev_event_ids=prev_event_ids,
             inherited_depth=inherited_depth,
             auth_event_ids=auth_event_ids,
             app_service_requester=requester,
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index ca71f073fc..22f6474127 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -16,9 +16,10 @@ import logging
 from queue import Empty, PriorityQueue
 from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Tuple
 
+import attr
 from prometheus_client import Counter, Gauge
 
-from synapse.api.constants import MAX_DEPTH
+from synapse.api.constants import MAX_DEPTH, EventTypes
 from synapse.api.errors import StoreError
 from synapse.api.room_versions import EventFormatVersions, RoomVersion
 from synapse.events import EventBase, make_event_from_dict
@@ -60,6 +61,15 @@ pdus_pruned_from_federation_queue = Counter(
 logger = logging.getLogger(__name__)
 
 
+# All the info we need while iterating the DAG while backfilling
+@attr.s(frozen=True, slots=True, auto_attribs=True)
+class BackfillQueueNavigationItem:
+    depth: int
+    stream_ordering: int
+    event_id: str
+    type: str
+
+
 class _NoChainCoverIndex(Exception):
     def __init__(self, room_id: str):
         super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,))
@@ -74,6 +84,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
     ):
         super().__init__(database, db_conn, hs)
 
+        self.hs = hs
+
         if hs.config.worker.run_background_tasks:
             hs.get_clock().looping_call(
                 self._delete_old_forward_extrem_cache, 60 * 60 * 1000
@@ -737,7 +749,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             room_id,
         )
 
-    async def get_insertion_event_backwards_extremities_in_room(
+    async def get_insertion_event_backward_extremities_in_room(
         self, room_id
     ) -> Dict[str, int]:
         """Get the insertion events we know about that we haven't backfilled yet.
@@ -754,7 +766,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             Map from event_id to depth
         """
 
-        def get_insertion_event_backwards_extremities_in_room_txn(txn, room_id):
+        def get_insertion_event_backward_extremities_in_room_txn(txn, room_id):
             sql = """
                 SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
                 /* We only want insertion events that are also marked as backwards extremities */
@@ -770,8 +782,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             return dict(txn)
 
         return await self.db_pool.runInteraction(
-            "get_insertion_event_backwards_extremities_in_room",
-            get_insertion_event_backwards_extremities_in_room_txn,
+            "get_insertion_event_backward_extremities_in_room",
+            get_insertion_event_backward_extremities_in_room_txn,
             room_id,
         )
 
@@ -997,143 +1009,242 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             "get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn
         )
 
-    async def get_backfill_events(self, room_id: str, event_list: list, limit: int):
-        """Get a list of Events for a given topic that occurred before (and
-        including) the events in event_list. Return a list of max size `limit`
+    def _get_connected_batch_event_backfill_results_txn(
+        self, txn: LoggingTransaction, insertion_event_id: str, limit: int
+    ) -> List[BackfillQueueNavigationItem]:
+        """
+        Find any batch connections of a given insertion event.
+        A batch event points at a insertion event via:
+        batch_event.content[MSC2716_BATCH_ID] -> insertion_event.content[MSC2716_NEXT_BATCH_ID]
 
         Args:
-            room_id
-            event_list
-            limit
+            txn: The database transaction to use
+            insertion_event_id: The event ID to navigate from. We will find
+                batch events that point back at this insertion event.
+            limit: Max number of event ID's to query for and return
+
+        Returns:
+            List of batch events that the backfill queue can process
+        """
+        batch_connection_query = """
+            SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i
+            /* Find the batch that connects to the given insertion event */
+            INNER JOIN batch_events AS c
+            ON i.next_batch_id = c.batch_id
+            /* Get the depth of the batch start event from the events table */
+            INNER JOIN events AS e USING (event_id)
+            /* Find an insertion event which matches the given event_id */
+            WHERE i.event_id = ?
+            LIMIT ?
         """
-        event_ids = await self.db_pool.runInteraction(
-            "get_backfill_events",
-            self._get_backfill_events,
-            room_id,
-            event_list,
-            limit,
-        )
-        events = await self.get_events_as_list(event_ids)
-        return sorted(events, key=lambda e: -e.depth)
 
-    def _get_backfill_events(self, txn, room_id, event_list, limit):
-        logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit)
+        # Find any batch connections for the given insertion event
+        txn.execute(
+            batch_connection_query,
+            (insertion_event_id, limit),
+        )
+        return [
+            BackfillQueueNavigationItem(
+                depth=row[0],
+                stream_ordering=row[1],
+                event_id=row[2],
+                type=row[3],
+            )
+            for row in txn
+        ]
 
-        event_results = set()
+    def _get_connected_prev_event_backfill_results_txn(
+        self, txn: LoggingTransaction, event_id: str, limit: int
+    ) -> List[BackfillQueueNavigationItem]:
+        """
+        Find any events connected by prev_event the specified event_id.
 
-        # We want to make sure that we do a breadth-first, "depth" ordered
-        # search.
+        Args:
+            txn: The database transaction to use
+            event_id: The event ID to navigate from
+            limit: Max number of event ID's to query for and return
 
+        Returns:
+            List of prev events that the backfill queue can process
+        """
         # Look for the prev_event_id connected to the given event_id
-        query = """
-            SELECT depth, prev_event_id FROM event_edges
-            /* Get the depth of the prev_event_id from the events table */
+        connected_prev_event_query = """
+            SELECT depth, stream_ordering, prev_event_id, events.type FROM event_edges
+            /* Get the depth and stream_ordering of the prev_event_id from the events table */
             INNER JOIN events
             ON prev_event_id = events.event_id
-            /* Find an event which matches the given event_id */
+            /* Look for an edge which matches the given event_id */
             WHERE event_edges.event_id = ?
             AND event_edges.is_state = ?
+            /* Because we can have many events at the same depth,
+            * we want to also tie-break and sort on stream_ordering */
+            ORDER BY depth DESC, stream_ordering DESC
             LIMIT ?
         """
 
-        # Look for the "insertion" events connected to the given event_id
-        connected_insertion_event_query = """
-            SELECT e.depth, i.event_id FROM insertion_event_edges AS i
-            /* Get the depth of the insertion event from the events table */
-            INNER JOIN events AS e USING (event_id)
-            /* Find an insertion event which points via prev_events to the given event_id */
-            WHERE i.insertion_prev_event_id = ?
-            LIMIT ?
+        txn.execute(
+            connected_prev_event_query,
+            (event_id, False, limit),
+        )
+        return [
+            BackfillQueueNavigationItem(
+                depth=row[0],
+                stream_ordering=row[1],
+                event_id=row[2],
+                type=row[3],
+            )
+            for row in txn
+        ]
+
+    async def get_backfill_events(
+        self, room_id: str, seed_event_id_list: list, limit: int
+    ):
+        """Get a list of Events for a given topic that occurred before (and
+        including) the events in seed_event_id_list. Return a list of max size `limit`
+
+        Args:
+            room_id
+            seed_event_id_list
+            limit
         """
+        event_ids = await self.db_pool.runInteraction(
+            "get_backfill_events",
+            self._get_backfill_events,
+            room_id,
+            seed_event_id_list,
+            limit,
+        )
+        events = await self.get_events_as_list(event_ids)
+        return sorted(
+            events, key=lambda e: (-e.depth, -e.internal_metadata.stream_ordering)
+        )
 
-        # Find any batch connections of a given insertion event
-        batch_connection_query = """
-            SELECT e.depth, c.event_id FROM insertion_events AS i
-            /* Find the batch that connects to the given insertion event */
-            INNER JOIN batch_events AS c
-            ON i.next_batch_id = c.batch_id
-            /* Get the depth of the batch start event from the events table */
-            INNER JOIN events AS e USING (event_id)
-            /* Find an insertion event which matches the given event_id */
-            WHERE i.event_id = ?
-            LIMIT ?
+    def _get_backfill_events(self, txn, room_id, seed_event_id_list, limit):
+        """
+        We want to make sure that we do a breadth-first, "depth" ordered search.
+        We also handle navigating historical branches of history connected by
+        insertion and batch events.
         """
+        logger.debug(
+            "_get_backfill_events(room_id=%s): seeding backfill with seed_event_id_list=%s limit=%s",
+            room_id,
+            seed_event_id_list,
+            limit,
+        )
+
+        event_id_results = set()
 
         # In a PriorityQueue, the lowest valued entries are retrieved first.
-        # We're using depth as the priority in the queue.
-        # Depth is lowest at the oldest-in-time message and highest and
-        # newest-in-time message. We add events to the queue with a negative depth so that
-        # we process the newest-in-time messages first going backwards in time.
+        # We're using depth as the priority in the queue and tie-break based on
+        # stream_ordering. Depth is lowest at the oldest-in-time message and
+        # highest and newest-in-time message. We add events to the queue with a
+        # negative depth so that we process the newest-in-time messages first
+        # going backwards in time. stream_ordering follows the same pattern.
         queue = PriorityQueue()
 
-        for event_id in event_list:
-            depth = self.db_pool.simple_select_one_onecol_txn(
+        for seed_event_id in seed_event_id_list:
+            event_lookup_result = self.db_pool.simple_select_one_txn(
                 txn,
                 table="events",
-                keyvalues={"event_id": event_id, "room_id": room_id},
-                retcol="depth",
+                keyvalues={"event_id": seed_event_id, "room_id": room_id},
+                retcols=(
+                    "type",
+                    "depth",
+                    "stream_ordering",
+                ),
                 allow_none=True,
             )
 
-            if depth:
-                queue.put((-depth, event_id))
+            if event_lookup_result is not None:
+                logger.debug(
+                    "_get_backfill_events(room_id=%s): seed_event_id=%s depth=%s stream_ordering=%s type=%s",
+                    room_id,
+                    seed_event_id,
+                    event_lookup_result["depth"],
+                    event_lookup_result["stream_ordering"],
+                    event_lookup_result["type"],
+                )
 
-        while not queue.empty() and len(event_results) < limit:
+                if event_lookup_result["depth"]:
+                    queue.put(
+                        (
+                            -event_lookup_result["depth"],
+                            -event_lookup_result["stream_ordering"],
+                            seed_event_id,
+                            event_lookup_result["type"],
+                        )
+                    )
+
+        while not queue.empty() and len(event_id_results) < limit:
             try:
-                _, event_id = queue.get_nowait()
+                _, _, event_id, event_type = queue.get_nowait()
             except Empty:
                 break
 
-            if event_id in event_results:
+            if event_id in event_id_results:
                 continue
 
-            event_results.add(event_id)
+            event_id_results.add(event_id)
 
             # Try and find any potential historical batches of message history.
-            #
-            # First we look for an insertion event connected to the current
-            # event (by prev_event). If we find any, we need to go and try to
-            # find any batch events connected to the insertion event (by
-            # batch_id). If we find any, we'll add them to the queue and
-            # navigate up the DAG like normal in the next iteration of the loop.
-            txn.execute(
-                connected_insertion_event_query, (event_id, limit - len(event_results))
-            )
-            connected_insertion_event_id_results = txn.fetchall()
-            logger.debug(
-                "_get_backfill_events: connected_insertion_event_query %s",
-                connected_insertion_event_id_results,
-            )
-            for row in connected_insertion_event_id_results:
-                connected_insertion_event_depth = row[0]
-                connected_insertion_event = row[1]
-                queue.put((-connected_insertion_event_depth, connected_insertion_event))
+            if self.hs.config.experimental.msc2716_enabled:
+                # We need to go and try to find any batch events connected
+                # to a given insertion event (by batch_id). If we find any, we'll
+                # add them to the queue and navigate up the DAG like normal in the
+                # next iteration of the loop.
+                if event_type == EventTypes.MSC2716_INSERTION:
+                    # Find any batch connections for the given insertion event
+                    connected_batch_event_backfill_results = (
+                        self._get_connected_batch_event_backfill_results_txn(
+                            txn, event_id, limit - len(event_id_results)
+                        )
+                    )
+                    logger.debug(
+                        "_get_backfill_events(room_id=%s): connected_batch_event_backfill_results=%s",
+                        room_id,
+                        connected_batch_event_backfill_results,
+                    )
+                    for (
+                        connected_batch_event_backfill_item
+                    ) in connected_batch_event_backfill_results:
+                        if (
+                            connected_batch_event_backfill_item.event_id
+                            not in event_id_results
+                        ):
+                            queue.put(
+                                (
+                                    -connected_batch_event_backfill_item.depth,
+                                    -connected_batch_event_backfill_item.stream_ordering,
+                                    connected_batch_event_backfill_item.event_id,
+                                    connected_batch_event_backfill_item.type,
+                                )
+                            )
 
-                # Find any batch connections for the given insertion event
-                txn.execute(
-                    batch_connection_query,
-                    (connected_insertion_event, limit - len(event_results)),
-                )
-                batch_start_event_id_results = txn.fetchall()
-                logger.debug(
-                    "_get_backfill_events: batch_start_event_id_results %s",
-                    batch_start_event_id_results,
+            # Now we just look up the DAG by prev_events as normal
+            connected_prev_event_backfill_results = (
+                self._get_connected_prev_event_backfill_results_txn(
+                    txn, event_id, limit - len(event_id_results)
                 )
-                for row in batch_start_event_id_results:
-                    if row[1] not in event_results:
-                        queue.put((-row[0], row[1]))
-
-            txn.execute(query, (event_id, False, limit - len(event_results)))
-            prev_event_id_results = txn.fetchall()
+            )
             logger.debug(
-                "_get_backfill_events: prev_event_ids %s", prev_event_id_results
+                "_get_backfill_events(room_id=%s): connected_prev_event_backfill_results=%s",
+                room_id,
+                connected_prev_event_backfill_results,
             )
+            for (
+                connected_prev_event_backfill_item
+            ) in connected_prev_event_backfill_results:
+                if connected_prev_event_backfill_item.event_id not in event_id_results:
+                    queue.put(
+                        (
+                            -connected_prev_event_backfill_item.depth,
+                            -connected_prev_event_backfill_item.stream_ordering,
+                            connected_prev_event_backfill_item.event_id,
+                            connected_prev_event_backfill_item.type,
+                        )
+                    )
 
-            for row in prev_event_id_results:
-                if row[1] not in event_results:
-                    queue.put((-row[0], row[1]))
-
-        return event_results
+        return event_id_results
 
     async def get_missing_events(self, room_id, earliest_events, latest_events, limit):
         ids = await self.db_pool.runInteraction(
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index b7554154ac..b804185c40 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -2215,9 +2215,14 @@ class PersistEventsStore:
             "   SELECT 1 FROM event_backward_extremities"
             "   WHERE event_id = ? AND room_id = ?"
             " )"
+            # 1. Don't add an event as a extremity again if we already persisted it
+            # as a non-outlier.
+            # 2. Don't add an outlier as an extremity if it has no prev_events
             " AND NOT EXISTS ("
-            "   SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
-            "   AND outlier = ?"
+            "   SELECT 1 FROM events"
+            "   LEFT JOIN event_edges edge"
+            "   ON edge.event_id = events.event_id"
+            "   WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = ? OR edge.event_id IS NULL)"
             " )"
         )
 
@@ -2243,6 +2248,10 @@ class PersistEventsStore:
                 (ev.event_id, ev.room_id)
                 for ev in events
                 if not ev.internal_metadata.is_outlier()
+                # If we encountered an event with no prev_events, then we might
+                # as well remove it now because it won't ever have anything else
+                # to backfill from.
+                or len(ev.prev_event_ids()) == 0
             ],
         )