diff options
author | Eric Eastwood <erice@element.io> | 2021-10-20 16:48:23 -0500 |
---|---|---|
committer | Eric Eastwood <erice@element.io> | 2021-10-20 16:48:28 -0500 |
commit | 886071b66b743c0f0f0d25866680d4ba6d1f9bc8 (patch) | |
tree | e0db3bc5320cac60e9ffe4f3b7b1d49568abe2dc | |
parent | Some more trials of trying to get many many events to backfill in order on re... (diff) | |
download | synapse-886071b66b743c0f0f0d25866680d4ba6d1f9bc8.tar.xz |
Fix backfill not picking up batch events connected to non-base insertion events
Previously, we would only look for a batch event if the insertion event was connected to something else by prev_event. This is only the case for the base insertion event. And instead, we need to look for a batch event whenever we come across an insertion event.
-rwxr-xr-x | scripts-dev/complement.sh | 2 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 2 | ||||
-rw-r--r-- | synapse/handlers/federation_event.py | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/event_federation.py | 49 |
4 files changed, 34 insertions, 21 deletions
diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh index 89af7a4fde..549477b11f 100755 --- a/scripts-dev/complement.sh +++ b/scripts-dev/complement.sh @@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then fi # Run the tests! -go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/... +go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/main_test.go ./tests/msc2716_test.go diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 355291ff45..6bb9fbfa77 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -270,6 +270,8 @@ class FederationHandler: # request URI to be too long. extremities = dict(sorted_extremeties_tuple[:5]) + logger.info("backfill extremities=%s", extremities) + # Now we need to decide which hosts to hit first. # First we try hosts that are already in the room diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index d9fe4a430c..177352f832 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -1276,7 +1276,7 @@ class FederationEventHandler: await self.persist_events_and_notify( room_id, tuple(events_to_persist), - # TODO: Maybe this to get fetched missing events during backfill as backfill also :/ + # TODO: Maybe this to get fetched missing events during backfill as backfilled also :/ backfilled=True, ) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 3d20bb8845..f7da3cd4eb 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -18,7 +18,7 @@ from typing import Collection, Dict, Iterable, List, Optional, Set, OrderedDict, from prometheus_client import Counter, Gauge -from synapse.api.constants import MAX_DEPTH +from synapse.api.constants import MAX_DEPTH, EventTypes from synapse.api.errors import StoreError from synapse.api.room_versions import EventFormatVersions, RoomVersion from synapse.events import EventBase, make_event_from_dict @@ -1013,8 +1013,8 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # search. # Look for the prev_event_id connected to the given event_id - query = """ - SELECT depth, stream_ordering, prev_event_id FROM event_edges + connected_prev_event_query = """ + SELECT depth, stream_ordering, prev_event_id, events.type FROM event_edges /* Get the depth and stream_ordering of the prev_event_id from the events table */ INNER JOIN events ON prev_event_id = events.event_id @@ -1029,7 +1029,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # Look for the "insertion" events connected to the given event_id connected_insertion_event_query = """ - SELECT e.depth, e.stream_ordering, i.event_id FROM insertion_event_edges AS i + SELECT e.depth, e.stream_ordering, i.event_id, e.type FROM insertion_event_edges AS i /* Get the depth of the insertion event from the events table */ INNER JOIN events AS e USING (event_id) /* Find an insertion event which points via prev_events to the given event_id */ @@ -1039,7 +1039,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # Find any batch connections of a given insertion event batch_connection_query = """ - SELECT e.depth, e.stream_ordering, c.event_id FROM insertion_events AS i + SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i /* Find the batch that connects to the given insertion event */ INNER JOIN batch_events AS c ON i.next_batch_id = c.batch_id @@ -1063,6 +1063,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas table="events", keyvalues={"event_id": event_id, "room_id": room_id}, retcols=( + "type", "depth", "stream_ordering", ), @@ -1075,12 +1076,13 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas -event_lookup_result["depth"], -event_lookup_result["stream_ordering"], event_id, + event_lookup_result["type"], ) ) while not queue.empty() and len(event_results) < limit: try: - _, _, event_id = queue.get_nowait() + _, _, event_id, event_type = queue.get_nowait() except Empty: break @@ -1125,46 +1127,55 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # Try and find any potential historical batches of message history. # # First we look for an insertion event connected to the current - # event (by prev_event). If we find any, we need to go and try to - # find any batch events connected to the insertion event (by - # batch_id). If we find any, we'll add them to the queue and - # navigate up the DAG like normal in the next iteration of the loop. + # event (by prev_event). If we find any, we'll add them to the queue + # and navigate up the DAG like normal in the next iteration of the + # loop. txn.execute( connected_insertion_event_query, (event_id, limit - len(event_results)) ) connected_insertion_event_id_results = txn.fetchall() - logger.debug( + logger.info( "_get_backfill_events: connected_insertion_event_query %s", connected_insertion_event_id_results, ) for row in connected_insertion_event_id_results: connected_insertion_event_depth = row[0] connected_insertion_event_stream_ordering = row[1] - connected_insertion_event = row[2] - if connected_insertion_event not in event_results: + connected_insertion_event_id = row[2] + connected_insertion_event_type = row[3] + if connected_insertion_event_id not in event_results: queue.put( ( -connected_insertion_event_depth, -connected_insertion_event_stream_ordering, - connected_insertion_event, + connected_insertion_event_id, + connected_insertion_event_type, ) ) + # Second, we need to go and try to find any batch events connected + # to a given insertion event (by batch_id). If we find any, we'll + # add them to the queue and navigate up the DAG like normal in the + # next iteration of the loop. + if event_type == EventTypes.MSC2716_INSERTION: # Find any batch connections for the given insertion event txn.execute( batch_connection_query, - (connected_insertion_event, limit - len(event_results)), + (event_id, limit - len(event_results)), ) batch_start_event_id_results = txn.fetchall() - logger.debug( + logger.info( "_get_backfill_events: batch_start_event_id_results %s", batch_start_event_id_results, ) for row in batch_start_event_id_results: if row[2] not in event_results: - queue.put((-row[0], -row[1], row[2])) + queue.put((-row[0], -row[1], row[2], row[3])) - txn.execute(query, (event_id, False, limit - len(event_results))) + txn.execute( + connected_prev_event_query, + (event_id, False, limit - len(event_results)), + ) prev_event_id_results = txn.fetchall() logger.info( "_get_backfill_events: prev_event_ids %s", prev_event_id_results @@ -1177,7 +1188,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas for row in prev_event_id_results: if row[2] not in event_results: - queue.put((-row[0], -row[1], row[2])) + queue.put((-row[0], -row[1], row[2], row[3])) return event_results.values() |