diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 2ab4dec88f..eafffff2bc 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -264,6 +264,62 @@ class FederationClient(FederationBase):
return pdus
+ async def get_pdu_from_destination_raw(
+ self,
+ destination: str,
+ event_id: str,
+ room_version: RoomVersion,
+ outlier: bool = False,
+ timeout: Optional[int] = None,
+ ) -> Optional[EventBase]:
+ """Requests the PDU with given origin and ID from the remote home
+ server.
+
+ Does not have any caching or rate limiting!
+
+ Args:
+ destination: Which homeserver to query
+ event_id: event to fetch
+ room_version: version of the room
+ outlier: Indicates whether the PDU is an `outlier`, i.e. if
+ it's from an arbitrary point in the context as opposed to part
+ of the current block of PDUs. Defaults to `False`
+ timeout: How long to try (in ms) each destination for before
+ moving to the next destination. None indicates no timeout.
+
+ Returns:
+ The requested PDU, or None if we were unable to find it.
+
+ Raises:
+ SynapseError, NotRetryingDestination, FederationDeniedError
+ """
+
+ signed_pdu = None
+
+ transaction_data = await self.transport_layer.get_event(
+ destination, event_id, timeout=timeout
+ )
+
+ logger.info(
+ "retrieved event id %s from %s: %r",
+ event_id,
+ destination,
+ transaction_data,
+ )
+
+ pdu_list: List[EventBase] = [
+ event_from_pdu_json(p, room_version, outlier=outlier)
+ for p in transaction_data["pdus"]
+ ]
+
+ if pdu_list and pdu_list[0]:
+ pdu = pdu_list[0]
+
+ # Check signatures are correct.
+ signed_pdu = await self._check_sigs_and_hash(room_version, pdu)
+
+ return signed_pdu
+
async def get_pdu(
self,
destinations: Iterable[str],
@@ -308,30 +364,14 @@ class FederationClient(FederationBase):
continue
try:
- transaction_data = await self.transport_layer.get_event(
- destination, event_id, timeout=timeout
- )
-
- logger.debug(
- "retrieved event id %s from %s: %r",
- event_id,
- destination,
- transaction_data,
+ signed_pdu = await self.get_pdu_from_destination_raw(
+ destination=destination,
+ event_id=event_id,
+ room_version=room_version,
+ outlier=outlier,
+ timeout=timeout,
)
- pdu_list: List[EventBase] = [
- event_from_pdu_json(p, room_version, outlier=outlier)
- for p in transaction_data["pdus"]
- ]
-
- if pdu_list and pdu_list[0]:
- pdu = pdu_list[0]
-
- # Check signatures are correct.
- signed_pdu = await self._check_sigs_and_hash(room_version, pdu)
-
- break
-
pdu_attempts[destination] = now
except SynapseError as e:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index e28c74daf0..c2163c7e53 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -62,6 +62,7 @@ from synapse.types import JsonDict, StateMap, get_domain_from_id
from synapse.util.async_helpers import Linearizer
from synapse.util.retryutils import NotRetryingDestination
from synapse.visibility import filter_events_for_server
+from synapse.storage.databases.main.event_federation import BackfillQueueNavigationItem
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -1043,8 +1044,16 @@ class FederationHandler:
return []
async def get_backfill_events(
- self, room_id: str, event_id_list: list, limit: int
+ self, origin: str, room_id: str, event_id_list: list, limit: int
) -> List[EventBase]:
+ logger.info(
+ "get_backfill_events(room_id=%s): seeding backfill with event_id_list=%s limit=%s origin=%s",
+ room_id,
+ event_id_list,
+ limit,
+ origin,
+ )
+
event_id_results = set()
# In a PriorityQueue, the lowest valued entries are retrieved first.
@@ -1054,8 +1063,20 @@ class FederationHandler:
# negative depth so that we process the newest-in-time messages first
# going backwards in time. stream_ordering follows the same pattern.
queue = PriorityQueue()
-
seed_events = await self.store.get_events_as_list(event_id_list)
+ logger.info(
+ "get_backfill_events(room_id=%s): seed_events=%s",
+ room_id,
+ [
+ BackfillQueueNavigationItem(
+ depth=seed_event.depth,
+ stream_ordering=seed_event.internal_metadata.stream_ordering,
+ event_id=seed_event.event_id,
+ type=seed_event.type,
+ )
+ for seed_event in seed_events
+ ],
+ )
for seed_event in seed_events:
# Make sure the seed event actually pertains to this room. We also
# need to make sure the depth is available since our whole DAG
@@ -1079,8 +1100,7 @@ class FederationHandler:
if event_id in event_id_results:
continue
- event_id_results.add(event_id)
-
+ found_undiscovered_connected_historical_messages = False
if self.hs.config.experimental.msc2716_enabled:
# Try and find any potential historical batches of message history.
#
@@ -1093,8 +1113,10 @@ class FederationHandler:
event_id, limit - len(event_id_results)
)
)
- logger.debug(
- "_get_backfill_events: connected_insertion_event_backfill_results=%s",
+ logger.info(
+ "get_backfill_events(room_id=%s): connected_insertion_event_backfill_results(%s)=%s",
+ room_id,
+ event_id,
connected_insertion_event_backfill_results,
)
for (
@@ -1104,15 +1126,63 @@ class FederationHandler:
connected_insertion_event_backfill_item.event_id
not in event_id_results
):
- queue.put(
- (
- -connected_insertion_event_backfill_item.depth,
- -connected_insertion_event_backfill_item.stream_ordering,
+ # Check whether the insertion event is already on the
+ # federating homeserver we're trying to send backfill
+ # events to
+ room_version = await self.store.get_room_version(room_id)
+ event_exists_on_remote_server = None
+ try:
+ # Because of the nature of backfill giving events to
+ # the federated homeserver in one chunk and then we
+ # can possibly query about that same event in the
+ # next chunk, we need to avoid getting a cached
+ # response. We want to know *now* whether they have
+ # backfilled the insertion event.
+ event_exists_on_remote_server = await self.federation_client.get_pdu_from_destination_raw(
+ origin,
connected_insertion_event_backfill_item.event_id,
- connected_insertion_event_backfill_item.type,
+ room_version=room_version,
+ outlier=True,
+ timeout=10000,
)
+ except Exception as e:
+ logger.info(
+ "get_backfill_events(room_id=%s): Failed to fetch insertion event_id=%s from origin=%s but we're just going to assume it's not backfilled there yet. error=%s",
+ room_id,
+ connected_insertion_event_backfill_item.event_id,
+ origin,
+ e,
+ )
+
+ logger.info(
+ "get_backfill_events(room_id=%s): checked if insertion event_id=%s exists on federated homeserver(origin=%s) already? event_exists_on_remote_server=%s",
+ room_id,
+ connected_insertion_event_backfill_item.event_id,
+ origin,
+ event_exists_on_remote_server,
)
+ # If the event is already on the federated homeserver,
+ # we don't need to try to branch off onto this
+ # historical chain of messages. Below, we will instead
+ # just go up the `prev_events` as normal.
+ #
+ # This is important so that the first time we backfill
+ # the federated homeserver, we jump off and go down the
+ # historical branch. But after the historical branch is
+ # exhausted and the event comes up again in backfill, we
+ # will choose the "live" DAG.
+ if not event_exists_on_remote_server:
+ found_undiscovered_connected_historical_messages = True
+ queue.put(
+ (
+ -connected_insertion_event_backfill_item.depth,
+ -connected_insertion_event_backfill_item.stream_ordering,
+ connected_insertion_event_backfill_item.event_id,
+ connected_insertion_event_backfill_item.type,
+ )
+ )
+
# Second, we need to go and try to find any batch events connected
# to a given insertion event (by batch_id). If we find any, we'll
# add them to the queue and navigate up the DAG like normal in the
@@ -1123,8 +1193,10 @@ class FederationHandler:
event_id, limit - len(event_id_results)
)
)
- logger.debug(
- "_get_backfill_events: connected_batch_event_backfill_results %s",
+ logger.info(
+ "get_backfill_events(room_id=%s): connected_batch_event_backfill_results(%s)=%s",
+ room_id,
+ event_id,
connected_batch_event_backfill_results,
)
for (
@@ -1143,28 +1215,39 @@ class FederationHandler:
)
)
- # Now we just look up the DAG by prev_events as normal
- connected_prev_event_backfill_results = (
- await self.store.get_connected_prev_event_backfill_results(
- event_id, limit - len(event_id_results)
+ # If we found a historical branch of history off of the message lets
+ # navigate down that in the next iteration of the loop instead of
+ # the normal prev_event chain.
+ if not found_undiscovered_connected_historical_messages:
+ event_id_results.add(event_id)
+
+ # Now we just look up the DAG by prev_events as normal
+ connected_prev_event_backfill_results = (
+ await self.store.get_connected_prev_event_backfill_results(
+ event_id, limit - len(event_id_results)
+ )
)
- )
- logger.debug(
- "_get_backfill_events: prev_event_ids %s",
- connected_prev_event_backfill_results,
- )
- for (
- connected_prev_event_backfill_item
- ) in connected_prev_event_backfill_results:
- if connected_prev_event_backfill_item.event_id not in event_id_results:
- queue.put(
- (
- -connected_prev_event_backfill_item.depth,
- -connected_prev_event_backfill_item.stream_ordering,
- connected_prev_event_backfill_item.event_id,
- connected_prev_event_backfill_item.type,
+ logger.info(
+ "get_backfill_events(room_id=%s): connected_prev_event_backfill_results(%s)=%s",
+ room_id,
+ event_id,
+ connected_prev_event_backfill_results,
+ )
+ for (
+ connected_prev_event_backfill_item
+ ) in connected_prev_event_backfill_results:
+ if (
+ connected_prev_event_backfill_item.event_id
+ not in event_id_results
+ ):
+ queue.put(
+ (
+ -connected_prev_event_backfill_item.depth,
+ -connected_prev_event_backfill_item.stream_ordering,
+ connected_prev_event_backfill_item.event_id,
+ connected_prev_event_backfill_item.type,
+ )
)
- )
events = await self.store.get_events_as_list(event_id_results)
return sorted(
@@ -1182,24 +1265,25 @@ class FederationHandler:
# Synapse asks for 100 events per backfill request. Do not allow more.
limit = min(limit, 100)
- events = await self.store.get_backfill_events(room_id, pdu_list, limit)
- logger.info(
- "old implementation backfill events=%s",
- [
- "event_id=%s,depth=%d,body=%s,prevs=%s\n"
- % (
- event.event_id,
- event.depth,
- event.content.get("body", event.type),
- event.prev_event_ids(),
- )
- for event in events
- ],
- )
-
- events = await self.get_backfill_events(room_id, pdu_list, limit)
+ # events = await self.store.get_backfill_events(room_id, pdu_list, limit)
+ # logger.info(
+ # "old implementation backfill events=%s",
+ # [
+ # "event_id=%s,depth=%d,body=%s,prevs=%s\n"
+ # % (
+ # event.event_id,
+ # event.depth,
+ # event.content.get("body", event.type),
+ # event.prev_event_ids(),
+ # )
+ # for event in events
+ # ],
+ # )
+
+ events = await self.get_backfill_events(origin, room_id, pdu_list, limit)
logger.info(
- "new implementation backfill events=%s",
+ "new implementation backfill events(%d)=%s",
+ len(events),
[
"event_id=%s,depth=%d,body=%s,prevs=%s\n"
% (
|