diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index e7570310c5..21c615432a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1265,25 +1265,9 @@ class FederationHandler:
# Synapse asks for 100 events per backfill request. Do not allow more.
limit = min(limit, 100)
- # events = await self.store.get_backfill_events(room_id, pdu_list, limit)
- # logger.info(
- # "old implementation backfill events=%s",
- # [
- # "event_id=%s,depth=%d,body=%s,prevs=%s\n"
- # % (
- # event.event_id,
- # event.depth,
- # event.content.get("body", event.type),
- # event.prev_event_ids(),
- # )
- # for event in events
- # ],
- # )
-
- events = await self.get_backfill_events(origin, room_id, pdu_list, limit)
+ events = await self.store.get_backfill_events(room_id, pdu_list, limit)
logger.info(
- "new implementation backfill events(%d)=%s",
- len(events),
+ "old implementation backfill events=%s",
[
"event_id=%s,depth=%d,body=%s,prevs=%s\n"
% (
@@ -1296,6 +1280,22 @@ class FederationHandler:
],
)
+ # events = await self.get_backfill_events(origin, room_id, pdu_list, limit)
+ # logger.info(
+ # "new implementation backfill events(%d)=%s",
+ # len(events),
+ # [
+ # "event_id=%s,depth=%d,body=%s,prevs=%s\n"
+ # % (
+ # event.event_id,
+ # event.depth,
+ # event.content.get("body", event.type),
+ # event.prev_event_ids(),
+ # )
+ # for event in events
+ # ],
+ # )
+
events = await filter_events_for_server(self.storage, origin, events)
return events
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 8107cfa53a..5d9fae48e9 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1106,20 +1106,22 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
_get_connected_prev_event_backfill_results_txn,
)
- async def get_backfill_events(self, room_id: str, event_list: list, limit: int):
+ async def get_backfill_events(
+ self, room_id: str, seed_event_id_list: list, limit: int
+ ):
"""Get a list of Events for a given topic that occurred before (and
- including) the events in event_list. Return a list of max size `limit`
+ including) the events in seed_event_id_list. Return a list of max size `limit`
Args:
room_id
- event_list
+ seed_event_id_list
limit
"""
event_ids = await self.db_pool.runInteraction(
"get_backfill_events",
self._get_backfill_events,
room_id,
- event_list,
+ seed_event_id_list,
limit,
)
events = await self.get_events_as_list(event_ids)
@@ -1127,10 +1129,15 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
events, key=lambda e: (-e.depth, -e.internal_metadata.stream_ordering)
)
- def _get_backfill_events(self, txn, room_id, event_list, limit):
- logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit)
+ def _get_backfill_events(self, txn, room_id, seed_event_id_list, limit):
+ logger.info(
+ "_get_backfill_events(room_id=%s): seeding backfill with seed_event_id_list=%s limit=%s",
+ room_id,
+ seed_event_id_list,
+ limit,
+ )
- event_results = set()
+ event_id_results = set()
# We want to make sure that we do a breadth-first, "depth" ordered
# search.
@@ -1181,11 +1188,11 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
# going backwards in time. stream_ordering follows the same pattern.
queue = PriorityQueue()
- for event_id in event_list:
+ for seed_event_id in seed_event_id_list:
event_lookup_result = self.db_pool.simple_select_one_txn(
txn,
table="events",
- keyvalues={"event_id": event_id, "room_id": room_id},
+ keyvalues={"event_id": seed_event_id, "room_id": room_id},
retcols=(
"type",
"depth",
@@ -1194,57 +1201,66 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
allow_none=True,
)
+ logger.info(
+ "get_backfill_events(room_id=%s): seed_event_id=%s depth=%s stream_ordering=%s type=%s",
+ room_id,
+ seed_event_id,
+ event_lookup_result["depth"],
+ event_lookup_result["stream_ordering"],
+ event_lookup_result["type"],
+ )
+
if event_lookup_result["depth"]:
queue.put(
(
-event_lookup_result["depth"],
-event_lookup_result["stream_ordering"],
- event_id,
+ seed_event_id,
event_lookup_result["type"],
)
)
- while not queue.empty() and len(event_results) < limit:
+ while not queue.empty() and len(event_id_results) < limit:
try:
_, _, event_id, event_type = queue.get_nowait()
except Empty:
break
- if event_id in event_results:
+ if event_id in event_id_results:
continue
- event_results.add(event_id)
+ event_id_results.add(event_id)
+ # Try and find any potential historical batches of message history.
if self.hs.config.experimental.msc2716_enabled:
- # Try and find any potential historical batches of message history.
- #
- # First we look for an insertion event connected to the current
- # event (by prev_event). If we find any, we'll add them to the queue
- # and navigate up the DAG like normal in the next iteration of the
- # loop.
- txn.execute(
- connected_insertion_event_query,
- (event_id, limit - len(event_results)),
- )
- connected_insertion_event_id_results = txn.fetchall()
- logger.debug(
- "_get_backfill_events: connected_insertion_event_query %s",
- connected_insertion_event_id_results,
- )
- for row in connected_insertion_event_id_results:
- connected_insertion_event_depth = row[0]
- connected_insertion_event_stream_ordering = row[1]
- connected_insertion_event_id = row[2]
- connected_insertion_event_type = row[3]
- if connected_insertion_event_id not in event_results:
- queue.put(
- (
- -connected_insertion_event_depth,
- -connected_insertion_event_stream_ordering,
- connected_insertion_event_id,
- connected_insertion_event_type,
- )
- )
+ # # First we look for an insertion event connected to the current
+ # # event (by prev_event). If we find any, we'll add them to the queue
+ # # and navigate up the DAG like normal in the next iteration of the
+ # # loop.
+ # txn.execute(
+ # connected_insertion_event_query,
+ # (event_id, limit - len(event_id_results)),
+ # )
+ # connected_insertion_event_id_results = txn.fetchall()
+ # logger.debug(
+ # "_get_backfill_events(room_id=%s): connected_insertion_event_query %s",
+ # room_id,
+ # connected_insertion_event_id_results,
+ # )
+ # for row in connected_insertion_event_id_results:
+ # connected_insertion_event_depth = row[0]
+ # connected_insertion_event_stream_ordering = row[1]
+ # connected_insertion_event_id = row[2]
+ # connected_insertion_event_type = row[3]
+ # if connected_insertion_event_id not in event_id_results:
+ # queue.put(
+ # (
+ # -connected_insertion_event_depth,
+ # -connected_insertion_event_stream_ordering,
+ # connected_insertion_event_id,
+ # connected_insertion_event_type,
+ # )
+ # )
# Second, we need to go and try to find any batch events connected
# to a given insertion event (by batch_id). If we find any, we'll
@@ -1254,31 +1270,35 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
# Find any batch connections for the given insertion event
txn.execute(
batch_connection_query,
- (event_id, limit - len(event_results)),
+ (event_id, limit - len(event_id_results)),
)
batch_start_event_id_results = txn.fetchall()
logger.debug(
- "_get_backfill_events: batch_start_event_id_results %s",
+ "_get_backfill_events(room_id=%s): batch_start_event_id_results %s",
+ room_id,
batch_start_event_id_results,
)
for row in batch_start_event_id_results:
- if row[2] not in event_results:
+ if row[2] not in event_id_results:
queue.put((-row[0], -row[1], row[2], row[3]))
+ # Now we just look up the DAG by prev_events as normal
txn.execute(
connected_prev_event_query,
- (event_id, False, limit - len(event_results)),
+ (event_id, False, limit - len(event_id_results)),
)
prev_event_id_results = txn.fetchall()
logger.debug(
- "_get_backfill_events: prev_event_ids %s", prev_event_id_results
+ "_get_backfill_events(room_id=%s): prev_event_ids %s",
+ room_id,
+ prev_event_id_results,
)
for row in prev_event_id_results:
- if row[2] not in event_results:
+ if row[2] not in event_id_results:
queue.put((-row[0], -row[1], row[2], row[3]))
- return event_results
+ return event_id_results
async def get_missing_events(self, room_id, earliest_events, latest_events, limit):
ids = await self.db_pool.runInteraction(
|