diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 21c615432a..2dc5e64a39 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1043,217 +1043,6 @@ class FederationHandler:
else:
return []
- async def get_backfill_events(
- self, origin: str, room_id: str, event_id_list: list, limit: int
- ) -> List[EventBase]:
- logger.info(
- "get_backfill_events(room_id=%s): seeding backfill with event_id_list=%s limit=%s origin=%s",
- room_id,
- event_id_list,
- limit,
- origin,
- )
-
- event_id_results = set()
-
- # In a PriorityQueue, the lowest valued entries are retrieved first.
- # We're using depth as the priority in the queue and tie-break based on
- # stream_ordering. Depth is lowest at the oldest-in-time message and
- # highest and newest-in-time message. We add events to the queue with a
- # negative depth so that we process the newest-in-time messages first
- # going backwards in time. stream_ordering follows the same pattern.
- queue = PriorityQueue()
- seed_events = await self.store.get_events_as_list(event_id_list)
- logger.info(
- "get_backfill_events(room_id=%s): seed_events=%s",
- room_id,
- [
- BackfillQueueNavigationItem(
- depth=seed_event.depth,
- stream_ordering=seed_event.internal_metadata.stream_ordering,
- event_id=seed_event.event_id,
- type=seed_event.type,
- )
- for seed_event in seed_events
- ],
- )
- for seed_event in seed_events:
- # Make sure the seed event actually pertains to this room. We also
- # need to make sure the depth is available since our whole DAG
- # navigation here depends on depth.
- if seed_event.room_id == room_id and seed_event.depth:
- queue.put(
- (
- -seed_event.depth,
- -seed_event.internal_metadata.stream_ordering,
- seed_event.event_id,
- seed_event.type,
- )
- )
-
- while not queue.empty() and len(event_id_results) < limit:
- try:
- _, _, event_id, event_type = queue.get_nowait()
- except Empty:
- break
-
- if event_id in event_id_results:
- continue
-
- found_undiscovered_connected_historical_messages = False
- if self.hs.config.experimental.msc2716_enabled:
- # Try and find any potential historical batches of message history.
- #
- # First we look for an insertion event connected to the current
- # event (by prev_event). If we find any, we'll add them to the queue
- # and navigate up the DAG like normal in the next iteration of the
- # loop.
- connected_insertion_event_backfill_results = (
- await self.store.get_connected_insertion_event_backfill_results(
- event_id, limit - len(event_id_results)
- )
- )
- logger.info(
- "get_backfill_events(room_id=%s): connected_insertion_event_backfill_results(%s)=%s",
- room_id,
- event_id,
- connected_insertion_event_backfill_results,
- )
- for (
- connected_insertion_event_backfill_item
- ) in connected_insertion_event_backfill_results:
- if (
- connected_insertion_event_backfill_item.event_id
- not in event_id_results
- ):
- # Check whether the insertion event is already on the
- # federating homeserver we're trying to send backfill
- # events to
- room_version = await self.store.get_room_version(room_id)
- event_exists_on_remote_server = None
- try:
- # Because of the nature of backfill giving events to
- # the federated homeserver in one chunk and then we
- # can possibly query about that same event in the
- # next chunk, we need to avoid getting a cached
- # response. We want to know *now* whether they have
- # backfilled the insertion event.
- event_exists_on_remote_server = await self.federation_client.get_pdu_from_destination_raw(
- origin,
- connected_insertion_event_backfill_item.event_id,
- room_version=room_version,
- outlier=True,
- timeout=10000,
- )
- except Exception as e:
- logger.info(
- "get_backfill_events(room_id=%s): Failed to fetch insertion event_id=%s from origin=%s but we're just going to assume it's not backfilled there yet. error=%s",
- room_id,
- connected_insertion_event_backfill_item.event_id,
- origin,
- e,
- )
-
- logger.info(
- "get_backfill_events(room_id=%s): checked if insertion event_id=%s exists on federated homeserver(origin=%s) already? event_exists_on_remote_server=%s",
- room_id,
- connected_insertion_event_backfill_item.event_id,
- origin,
- event_exists_on_remote_server,
- )
-
- # If the event is already on the federated homeserver,
- # we don't need to try to branch off onto this
- # historical chain of messages. Below, we will instead
- # just go up the `prev_events` as normal.
- #
- # This is important so that the first time we backfill
- # the federated homeserver, we jump off and go down the
- # historical branch. But after the historical branch is
- # exhausted and the event comes up again in backfill, we
- # will choose the "live" DAG.
- if not event_exists_on_remote_server:
- found_undiscovered_connected_historical_messages = True
- queue.put(
- (
- -connected_insertion_event_backfill_item.depth,
- -connected_insertion_event_backfill_item.stream_ordering,
- connected_insertion_event_backfill_item.event_id,
- connected_insertion_event_backfill_item.type,
- )
- )
-
- # Second, we need to go and try to find any batch events connected
- # to a given insertion event (by batch_id). If we find any, we'll
- # add them to the queue and navigate up the DAG like normal in the
- # next iteration of the loop.
- if event_type == EventTypes.MSC2716_INSERTION:
- connected_batch_event_backfill_results = (
- await self.store.get_connected_batch_event_backfill_results(
- event_id, limit - len(event_id_results)
- )
- )
- logger.info(
- "get_backfill_events(room_id=%s): connected_batch_event_backfill_results(%s)=%s",
- room_id,
- event_id,
- connected_batch_event_backfill_results,
- )
- for (
- connected_batch_event_backfill_item
- ) in connected_batch_event_backfill_results:
- if (
- connected_batch_event_backfill_item.event_id
- not in event_id_results
- ):
- queue.put(
- (
- -connected_batch_event_backfill_item.depth,
- -connected_batch_event_backfill_item.stream_ordering,
- connected_batch_event_backfill_item.event_id,
- connected_batch_event_backfill_item.type,
- )
- )
-
- # If we found a historical branch of history off of the message lets
- # navigate down that in the next iteration of the loop instead of
- # the normal prev_event chain.
- if not found_undiscovered_connected_historical_messages:
- event_id_results.add(event_id)
-
- # Now we just look up the DAG by prev_events as normal
- connected_prev_event_backfill_results = (
- await self.store.get_connected_prev_event_backfill_results(
- event_id, limit - len(event_id_results)
- )
- )
- logger.info(
- "get_backfill_events(room_id=%s): connected_prev_event_backfill_results(%s)=%s",
- room_id,
- event_id,
- connected_prev_event_backfill_results,
- )
- for (
- connected_prev_event_backfill_item
- ) in connected_prev_event_backfill_results:
- if (
- connected_prev_event_backfill_item.event_id
- not in event_id_results
- ):
- queue.put(
- (
- -connected_prev_event_backfill_item.depth,
- -connected_prev_event_backfill_item.stream_ordering,
- connected_prev_event_backfill_item.event_id,
- connected_prev_event_backfill_item.type,
- )
- )
-
- events = await self.store.get_events_as_list(event_id_results)
- return sorted(
- events, key=lambda e: (-e.depth, -e.internal_metadata.stream_ordering)
- )
-
@log_function
async def on_backfill_request(
self, origin: str, room_id: str, pdu_list: List[str], limit: int
@@ -1280,22 +1069,6 @@ class FederationHandler:
],
)
- # events = await self.get_backfill_events(origin, room_id, pdu_list, limit)
- # logger.info(
- # "new implementation backfill events(%d)=%s",
- # len(events),
- # [
- # "event_id=%s,depth=%d,body=%s,prevs=%s\n"
- # % (
- # event.event_id,
- # event.depth,
- # event.content.get("body", event.type),
- # event.prev_event_ids(),
- # )
- # for event in events
- # ],
- # )
-
events = await filter_events_for_server(self.storage, origin, events)
return events
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 5d9fae48e9..299af0ded2 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -995,116 +995,70 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
"get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn
)
- async def get_connected_insertion_event_backfill_results(
- self, event_id: str, limit: int
- ) -> List[BackfillQueueNavigationItem]:
- def _get_connected_insertion_event_backfill_results_txn(txn):
- # Look for the "insertion" events connected to the given event_id
- connected_insertion_event_query = """
- SELECT e.depth, e.stream_ordering, i.event_id, e.type FROM insertion_event_edges AS i
- /* Get the depth of the insertion event from the events table */
- INNER JOIN events AS e USING (event_id)
- /* Find an insertion event which points via prev_events to the given event_id */
- WHERE i.insertion_prev_event_id = ?
- LIMIT ?
- """
-
- txn.execute(
- connected_insertion_event_query,
- (event_id, limit),
- )
- connected_insertion_event_id_results = txn.fetchall()
- return [
- BackfillQueueNavigationItem(
- depth=row[0],
- stream_ordering=row[1],
- event_id=row[2],
- type=row[3],
- )
- for row in connected_insertion_event_id_results
- ]
+ def _get_connected_batch_event_backfill_results_txn(
+ self, txn: LoggingTransaction, insertion_event_id: str, limit: int
+ ):
+ # Find any batch connections of a given insertion event
+ batch_connection_query = """
+ SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i
+ /* Find the batch that connects to the given insertion event */
+ INNER JOIN batch_events AS c
+ ON i.next_batch_id = c.batch_id
+ /* Get the depth of the batch start event from the events table */
+ INNER JOIN events AS e USING (event_id)
+ /* Find an insertion event which matches the given event_id */
+ WHERE i.event_id = ?
+ LIMIT ?
+ """
- return await self.db_pool.runInteraction(
- "get_connected_insertion_event_backfill_results",
- _get_connected_insertion_event_backfill_results_txn,
+ # Find any batch connections for the given insertion event
+ txn.execute(
+ batch_connection_query,
+ (insertion_event_id, limit),
)
-
- async def get_connected_batch_event_backfill_results(
- self, insertion_event_id: str, limit: int
- ) -> List[BackfillQueueNavigationItem]:
- def _get_connected_batch_event_backfill_results_txn(txn):
- # Find any batch connections of a given insertion event
- batch_connection_query = """
- SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i
- /* Find the batch that connects to the given insertion event */
- INNER JOIN batch_events AS c
- ON i.next_batch_id = c.batch_id
- /* Get the depth of the batch start event from the events table */
- INNER JOIN events AS e USING (event_id)
- /* Find an insertion event which matches the given event_id */
- WHERE i.event_id = ?
- LIMIT ?
- """
-
- # Find any batch connections for the given insertion event
- txn.execute(
- batch_connection_query,
- (insertion_event_id, limit),
+ batch_start_event_id_results = txn.fetchall()
+ return [
+ BackfillQueueNavigationItem(
+ depth=row[0],
+ stream_ordering=row[1],
+ event_id=row[2],
+ type=row[3],
)
- batch_start_event_id_results = txn.fetchall()
- return [
- BackfillQueueNavigationItem(
- depth=row[0],
- stream_ordering=row[1],
- event_id=row[2],
- type=row[3],
- )
- for row in batch_start_event_id_results
- ]
+ for row in batch_start_event_id_results
+ ]
- return await self.db_pool.runInteraction(
- "get_connected_batch_event_backfill_results",
- _get_connected_batch_event_backfill_results_txn,
- )
-
- async def get_connected_prev_event_backfill_results(
- self, event_id: str, limit: int
- ) -> List[BackfillQueueNavigationItem]:
- def _get_connected_prev_event_backfill_results_txn(txn):
- # Look for the prev_event_id connected to the given event_id
- connected_prev_event_query = """
- SELECT depth, stream_ordering, prev_event_id, events.type FROM event_edges
- /* Get the depth and stream_ordering of the prev_event_id from the events table */
- INNER JOIN events
- ON prev_event_id = events.event_id
- /* Look for an edge which matches the given event_id */
- WHERE event_edges.event_id = ?
- AND event_edges.is_state = ?
- /* Because we can have many events at the same depth,
- * we want to also tie-break and sort on stream_ordering */
- ORDER BY depth DESC, stream_ordering DESC
- LIMIT ?
- """
-
- txn.execute(
- connected_prev_event_query,
- (event_id, False, limit),
- )
- prev_event_id_results = txn.fetchall()
- return [
- BackfillQueueNavigationItem(
- depth=row[0],
- stream_ordering=row[1],
- event_id=row[2],
- type=row[3],
- )
- for row in prev_event_id_results
- ]
+ def _get_connected_prev_event_backfill_results_txn(
+ self, txn: LoggingTransaction, event_id: str, limit: int
+ ):
+ # Look for the prev_event_id connected to the given event_id
+ connected_prev_event_query = """
+ SELECT depth, stream_ordering, prev_event_id, events.type FROM event_edges
+ /* Get the depth and stream_ordering of the prev_event_id from the events table */
+ INNER JOIN events
+ ON prev_event_id = events.event_id
+ /* Look for an edge which matches the given event_id */
+ WHERE event_edges.event_id = ?
+ AND event_edges.is_state = ?
+ /* Because we can have many events at the same depth,
+ * we want to also tie-break and sort on stream_ordering */
+ ORDER BY depth DESC, stream_ordering DESC
+ LIMIT ?
+ """
- return await self.db_pool.runInteraction(
- "get_connected_prev_event_backfill_results",
- _get_connected_prev_event_backfill_results_txn,
+ txn.execute(
+ connected_prev_event_query,
+ (event_id, False, limit),
)
+ prev_event_id_results = txn.fetchall()
+ return [
+ BackfillQueueNavigationItem(
+ depth=row[0],
+ stream_ordering=row[1],
+ event_id=row[2],
+ type=row[3],
+ )
+ for row in prev_event_id_results
+ ]
async def get_backfill_events(
self, room_id: str, seed_event_id_list: list, limit: int
@@ -1130,6 +1084,11 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
)
def _get_backfill_events(self, txn, room_id, seed_event_id_list, limit):
+ """
+ We want to make sure that we do a breadth-first, "depth" ordered search.
+ We also handle navigating historical branches of history connected by
+ insertion and batch events.
+ """
logger.info(
"_get_backfill_events(room_id=%s): seeding backfill with seed_event_id_list=%s limit=%s",
room_id,
@@ -1139,47 +1098,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
event_id_results = set()
- # We want to make sure that we do a breadth-first, "depth" ordered
- # search.
-
- # Look for the prev_event_id connected to the given event_id
- connected_prev_event_query = """
- SELECT depth, stream_ordering, prev_event_id, events.type FROM event_edges
- /* Get the depth and stream_ordering of the prev_event_id from the events table */
- INNER JOIN events
- ON prev_event_id = events.event_id
- /* Look for an edge which matches the given event_id */
- WHERE event_edges.event_id = ?
- AND event_edges.is_state = ?
- /* Because we can have many events at the same depth,
- * we want to also tie-break and sort on stream_ordering */
- ORDER BY depth DESC, stream_ordering DESC
- LIMIT ?
- """
-
- # Look for the "insertion" events connected to the given event_id
- connected_insertion_event_query = """
- SELECT e.depth, e.stream_ordering, i.event_id, e.type FROM insertion_event_edges AS i
- /* Get the depth of the insertion event from the events table */
- INNER JOIN events AS e USING (event_id)
- /* Find an insertion event which points via prev_events to the given event_id */
- WHERE i.insertion_prev_event_id = ?
- LIMIT ?
- """
-
- # Find any batch connections of a given insertion event
- batch_connection_query = """
- SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i
- /* Find the batch that connects to the given insertion event */
- INNER JOIN batch_events AS c
- ON i.next_batch_id = c.batch_id
- /* Get the depth of the batch start event from the events table */
- INNER JOIN events AS e USING (event_id)
- /* Find an insertion event which matches the given event_id */
- WHERE i.event_id = ?
- LIMIT ?
- """
-
# In a PriorityQueue, the lowest valued entries are retrieved first.
# We're using depth as the priority in the queue and tie-break based on
# stream_ordering. Depth is lowest at the oldest-in-time message and
@@ -1233,70 +1151,61 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
# Try and find any potential historical batches of message history.
if self.hs.config.experimental.msc2716_enabled:
- # # First we look for an insertion event connected to the current
- # # event (by prev_event). If we find any, we'll add them to the queue
- # # and navigate up the DAG like normal in the next iteration of the
- # # loop.
- # txn.execute(
- # connected_insertion_event_query,
- # (event_id, limit - len(event_id_results)),
- # )
- # connected_insertion_event_id_results = txn.fetchall()
- # logger.debug(
- # "_get_backfill_events(room_id=%s): connected_insertion_event_query %s",
- # room_id,
- # connected_insertion_event_id_results,
- # )
- # for row in connected_insertion_event_id_results:
- # connected_insertion_event_depth = row[0]
- # connected_insertion_event_stream_ordering = row[1]
- # connected_insertion_event_id = row[2]
- # connected_insertion_event_type = row[3]
- # if connected_insertion_event_id not in event_id_results:
- # queue.put(
- # (
- # -connected_insertion_event_depth,
- # -connected_insertion_event_stream_ordering,
- # connected_insertion_event_id,
- # connected_insertion_event_type,
- # )
- # )
-
- # Second, we need to go and try to find any batch events connected
+ # We need to go and try to find any batch events connected
# to a given insertion event (by batch_id). If we find any, we'll
# add them to the queue and navigate up the DAG like normal in the
# next iteration of the loop.
if event_type == EventTypes.MSC2716_INSERTION:
# Find any batch connections for the given insertion event
- txn.execute(
- batch_connection_query,
- (event_id, limit - len(event_id_results)),
+ connected_batch_event_backfill_results = (
+ self._get_connected_batch_event_backfill_results_txn(
+ txn, event_id, limit - len(event_id_results)
+ )
)
- batch_start_event_id_results = txn.fetchall()
logger.debug(
- "_get_backfill_events(room_id=%s): batch_start_event_id_results %s",
+ "_get_backfill_events(room_id=%s): connected_batch_event_backfill_results=%s",
room_id,
- batch_start_event_id_results,
+ connected_batch_event_backfill_results,
)
- for row in batch_start_event_id_results:
- if row[2] not in event_id_results:
- queue.put((-row[0], -row[1], row[2], row[3]))
+ for (
+ connected_batch_event_backfill_item
+ ) in connected_batch_event_backfill_results:
+ if (
+ connected_batch_event_backfill_item.event_id
+ not in event_id_results
+ ):
+ queue.put(
+ (
+ -connected_batch_event_backfill_item.depth,
+ -connected_batch_event_backfill_item.stream_ordering,
+ connected_batch_event_backfill_item.event_id,
+ connected_batch_event_backfill_item.type,
+ )
+ )
# Now we just look up the DAG by prev_events as normal
- txn.execute(
- connected_prev_event_query,
- (event_id, False, limit - len(event_id_results)),
+ connected_prev_event_backfill_results = (
+ self._get_connected_prev_event_backfill_results_txn(
+ txn, event_id, limit - len(event_id_results)
+ )
)
- prev_event_id_results = txn.fetchall()
logger.debug(
- "_get_backfill_events(room_id=%s): prev_event_ids %s",
+ "_get_backfill_events(room_id=%s): connected_prev_event_backfill_results=%s",
room_id,
- prev_event_id_results,
+ connected_prev_event_backfill_results,
)
-
- for row in prev_event_id_results:
- if row[2] not in event_id_results:
- queue.put((-row[0], -row[1], row[2], row[3]))
+ for (
+ connected_prev_event_backfill_item
+ ) in connected_prev_event_backfill_results:
+ if connected_prev_event_backfill_item.event_id not in event_id_results:
+ queue.put(
+ (
+ -connected_prev_event_backfill_item.depth,
+ -connected_prev_event_backfill_item.stream_ordering,
+ connected_prev_event_backfill_item.event_id,
+ connected_prev_event_backfill_item.type,
+ )
+ )
return event_id_results
|