summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/federation.py36
-rw-r--r--synapse/storage/databases/main/event_federation.py118
2 files changed, 87 insertions, 67 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py

index e7570310c5..21c615432a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -1265,25 +1265,9 @@ class FederationHandler: # Synapse asks for 100 events per backfill request. Do not allow more. limit = min(limit, 100) - # events = await self.store.get_backfill_events(room_id, pdu_list, limit) - # logger.info( - # "old implementation backfill events=%s", - # [ - # "event_id=%s,depth=%d,body=%s,prevs=%s\n" - # % ( - # event.event_id, - # event.depth, - # event.content.get("body", event.type), - # event.prev_event_ids(), - # ) - # for event in events - # ], - # ) - - events = await self.get_backfill_events(origin, room_id, pdu_list, limit) + events = await self.store.get_backfill_events(room_id, pdu_list, limit) logger.info( - "new implementation backfill events(%d)=%s", - len(events), + "old implementation backfill events=%s", [ "event_id=%s,depth=%d,body=%s,prevs=%s\n" % ( @@ -1296,6 +1280,22 @@ class FederationHandler: ], ) + # events = await self.get_backfill_events(origin, room_id, pdu_list, limit) + # logger.info( + # "new implementation backfill events(%d)=%s", + # len(events), + # [ + # "event_id=%s,depth=%d,body=%s,prevs=%s\n" + # % ( + # event.event_id, + # event.depth, + # event.content.get("body", event.type), + # event.prev_event_ids(), + # ) + # for event in events + # ], + # ) + events = await filter_events_for_server(self.storage, origin, events) return events diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 8107cfa53a..5d9fae48e9 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py
@@ -1106,20 +1106,22 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas _get_connected_prev_event_backfill_results_txn, ) - async def get_backfill_events(self, room_id: str, event_list: list, limit: int): + async def get_backfill_events( + self, room_id: str, seed_event_id_list: list, limit: int + ): """Get a list of Events for a given topic that occurred before (and - including) the events in event_list. Return a list of max size `limit` + including) the events in seed_event_id_list. Return a list of max size `limit` Args: room_id - event_list + seed_event_id_list limit """ event_ids = await self.db_pool.runInteraction( "get_backfill_events", self._get_backfill_events, room_id, - event_list, + seed_event_id_list, limit, ) events = await self.get_events_as_list(event_ids) @@ -1127,10 +1129,15 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas events, key=lambda e: (-e.depth, -e.internal_metadata.stream_ordering) ) - def _get_backfill_events(self, txn, room_id, event_list, limit): - logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit) + def _get_backfill_events(self, txn, room_id, seed_event_id_list, limit): + logger.info( + "_get_backfill_events(room_id=%s): seeding backfill with seed_event_id_list=%s limit=%s", + room_id, + seed_event_id_list, + limit, + ) - event_results = set() + event_id_results = set() # We want to make sure that we do a breadth-first, "depth" ordered # search. @@ -1181,11 +1188,11 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # going backwards in time. stream_ordering follows the same pattern. queue = PriorityQueue() - for event_id in event_list: + for seed_event_id in seed_event_id_list: event_lookup_result = self.db_pool.simple_select_one_txn( txn, table="events", - keyvalues={"event_id": event_id, "room_id": room_id}, + keyvalues={"event_id": seed_event_id, "room_id": room_id}, retcols=( "type", "depth", @@ -1194,57 +1201,66 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas allow_none=True, ) + logger.info( + "get_backfill_events(room_id=%s): seed_event_id=%s depth=%s stream_ordering=%s type=%s", + room_id, + seed_event_id, + event_lookup_result["depth"], + event_lookup_result["stream_ordering"], + event_lookup_result["type"], + ) + if event_lookup_result["depth"]: queue.put( ( -event_lookup_result["depth"], -event_lookup_result["stream_ordering"], - event_id, + seed_event_id, event_lookup_result["type"], ) ) - while not queue.empty() and len(event_results) < limit: + while not queue.empty() and len(event_id_results) < limit: try: _, _, event_id, event_type = queue.get_nowait() except Empty: break - if event_id in event_results: + if event_id in event_id_results: continue - event_results.add(event_id) + event_id_results.add(event_id) + # Try and find any potential historical batches of message history. if self.hs.config.experimental.msc2716_enabled: - # Try and find any potential historical batches of message history. - # - # First we look for an insertion event connected to the current - # event (by prev_event). If we find any, we'll add them to the queue - # and navigate up the DAG like normal in the next iteration of the - # loop. - txn.execute( - connected_insertion_event_query, - (event_id, limit - len(event_results)), - ) - connected_insertion_event_id_results = txn.fetchall() - logger.debug( - "_get_backfill_events: connected_insertion_event_query %s", - connected_insertion_event_id_results, - ) - for row in connected_insertion_event_id_results: - connected_insertion_event_depth = row[0] - connected_insertion_event_stream_ordering = row[1] - connected_insertion_event_id = row[2] - connected_insertion_event_type = row[3] - if connected_insertion_event_id not in event_results: - queue.put( - ( - -connected_insertion_event_depth, - -connected_insertion_event_stream_ordering, - connected_insertion_event_id, - connected_insertion_event_type, - ) - ) + # # First we look for an insertion event connected to the current + # # event (by prev_event). If we find any, we'll add them to the queue + # # and navigate up the DAG like normal in the next iteration of the + # # loop. + # txn.execute( + # connected_insertion_event_query, + # (event_id, limit - len(event_id_results)), + # ) + # connected_insertion_event_id_results = txn.fetchall() + # logger.debug( + # "_get_backfill_events(room_id=%s): connected_insertion_event_query %s", + # room_id, + # connected_insertion_event_id_results, + # ) + # for row in connected_insertion_event_id_results: + # connected_insertion_event_depth = row[0] + # connected_insertion_event_stream_ordering = row[1] + # connected_insertion_event_id = row[2] + # connected_insertion_event_type = row[3] + # if connected_insertion_event_id not in event_id_results: + # queue.put( + # ( + # -connected_insertion_event_depth, + # -connected_insertion_event_stream_ordering, + # connected_insertion_event_id, + # connected_insertion_event_type, + # ) + # ) # Second, we need to go and try to find any batch events connected # to a given insertion event (by batch_id). If we find any, we'll @@ -1254,31 +1270,35 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # Find any batch connections for the given insertion event txn.execute( batch_connection_query, - (event_id, limit - len(event_results)), + (event_id, limit - len(event_id_results)), ) batch_start_event_id_results = txn.fetchall() logger.debug( - "_get_backfill_events: batch_start_event_id_results %s", + "_get_backfill_events(room_id=%s): batch_start_event_id_results %s", + room_id, batch_start_event_id_results, ) for row in batch_start_event_id_results: - if row[2] not in event_results: + if row[2] not in event_id_results: queue.put((-row[0], -row[1], row[2], row[3])) + # Now we just look up the DAG by prev_events as normal txn.execute( connected_prev_event_query, - (event_id, False, limit - len(event_results)), + (event_id, False, limit - len(event_id_results)), ) prev_event_id_results = txn.fetchall() logger.debug( - "_get_backfill_events: prev_event_ids %s", prev_event_id_results + "_get_backfill_events(room_id=%s): prev_event_ids %s", + room_id, + prev_event_id_results, ) for row in prev_event_id_results: - if row[2] not in event_results: + if row[2] not in event_id_results: queue.put((-row[0], -row[1], row[2], row[3])) - return event_results + return event_id_results async def get_missing_events(self, room_id, earliest_events, latest_events, limit): ids = await self.db_pool.runInteraction(