summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2021-10-20 16:48:23 -0500
committerEric Eastwood <erice@element.io>2021-10-20 16:48:28 -0500
commit886071b66b743c0f0f0d25866680d4ba6d1f9bc8 (patch)
treee0db3bc5320cac60e9ffe4f3b7b1d49568abe2dc
parentSome more trials of trying to get many many events to backfill in order on re... (diff)
downloadsynapse-886071b66b743c0f0f0d25866680d4ba6d1f9bc8.tar.xz
Fix backfill not picking up batch events connected to non-base insertion events
Previously, we would only look for a batch event if the insertion event
was connected to something else by prev_event. This is only the case
for the base insertion event. And instead, we need to look for
a batch event whenever we come across an insertion event.
-rwxr-xr-xscripts-dev/complement.sh2
-rw-r--r--synapse/handlers/federation.py2
-rw-r--r--synapse/handlers/federation_event.py2
-rw-r--r--synapse/storage/databases/main/event_federation.py49
4 files changed, 34 insertions, 21 deletions
diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh
index 89af7a4fde..549477b11f 100755
--- a/scripts-dev/complement.sh
+++ b/scripts-dev/complement.sh
@@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then
 fi
 
 # Run the tests!
-go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/...
+go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/main_test.go ./tests/msc2716_test.go
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 355291ff45..6bb9fbfa77 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -270,6 +270,8 @@ class FederationHandler:
         # request URI to be too long.
         extremities = dict(sorted_extremeties_tuple[:5])
 
+        logger.info("backfill extremities=%s", extremities)
+
         # Now we need to decide which hosts to hit first.
 
         # First we try hosts that are already in the room
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index d9fe4a430c..177352f832 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -1276,7 +1276,7 @@ class FederationEventHandler:
         await self.persist_events_and_notify(
             room_id,
             tuple(events_to_persist),
-            # TODO: Maybe this to get fetched missing events during backfill as backfill also :/
+            # TODO: Maybe this to get fetched missing events during backfill as backfilled also :/
             backfilled=True,
         )
 
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 3d20bb8845..f7da3cd4eb 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -18,7 +18,7 @@ from typing import Collection, Dict, Iterable, List, Optional, Set, OrderedDict,
 
 from prometheus_client import Counter, Gauge
 
-from synapse.api.constants import MAX_DEPTH
+from synapse.api.constants import MAX_DEPTH, EventTypes
 from synapse.api.errors import StoreError
 from synapse.api.room_versions import EventFormatVersions, RoomVersion
 from synapse.events import EventBase, make_event_from_dict
@@ -1013,8 +1013,8 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         # search.
 
         # Look for the prev_event_id connected to the given event_id
-        query = """
-            SELECT depth, stream_ordering, prev_event_id FROM event_edges
+        connected_prev_event_query = """
+            SELECT depth, stream_ordering, prev_event_id, events.type FROM event_edges
             /* Get the depth and stream_ordering of the prev_event_id from the events table */
             INNER JOIN events
             ON prev_event_id = events.event_id
@@ -1029,7 +1029,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
 
         # Look for the "insertion" events connected to the given event_id
         connected_insertion_event_query = """
-            SELECT e.depth, e.stream_ordering, i.event_id FROM insertion_event_edges AS i
+            SELECT e.depth, e.stream_ordering, i.event_id, e.type FROM insertion_event_edges AS i
             /* Get the depth of the insertion event from the events table */
             INNER JOIN events AS e USING (event_id)
             /* Find an insertion event which points via prev_events to the given event_id */
@@ -1039,7 +1039,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
 
         # Find any batch connections of a given insertion event
         batch_connection_query = """
-            SELECT e.depth, e.stream_ordering, c.event_id FROM insertion_events AS i
+            SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i
             /* Find the batch that connects to the given insertion event */
             INNER JOIN batch_events AS c
             ON i.next_batch_id = c.batch_id
@@ -1063,6 +1063,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
                 table="events",
                 keyvalues={"event_id": event_id, "room_id": room_id},
                 retcols=(
+                    "type",
                     "depth",
                     "stream_ordering",
                 ),
@@ -1075,12 +1076,13 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
                         -event_lookup_result["depth"],
                         -event_lookup_result["stream_ordering"],
                         event_id,
+                        event_lookup_result["type"],
                     )
                 )
 
         while not queue.empty() and len(event_results) < limit:
             try:
-                _, _, event_id = queue.get_nowait()
+                _, _, event_id, event_type = queue.get_nowait()
             except Empty:
                 break
 
@@ -1125,46 +1127,55 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             # Try and find any potential historical batches of message history.
             #
             # First we look for an insertion event connected to the current
-            # event (by prev_event). If we find any, we need to go and try to
-            # find any batch events connected to the insertion event (by
-            # batch_id). If we find any, we'll add them to the queue and
-            # navigate up the DAG like normal in the next iteration of the loop.
+            # event (by prev_event). If we find any, we'll add them to the queue
+            # and navigate up the DAG like normal in the next iteration of the
+            # loop.
             txn.execute(
                 connected_insertion_event_query, (event_id, limit - len(event_results))
             )
             connected_insertion_event_id_results = txn.fetchall()
-            logger.debug(
+            logger.info(
                 "_get_backfill_events: connected_insertion_event_query %s",
                 connected_insertion_event_id_results,
             )
             for row in connected_insertion_event_id_results:
                 connected_insertion_event_depth = row[0]
                 connected_insertion_event_stream_ordering = row[1]
-                connected_insertion_event = row[2]
-                if connected_insertion_event not in event_results:
+                connected_insertion_event_id = row[2]
+                connected_insertion_event_type = row[3]
+                if connected_insertion_event_id not in event_results:
                     queue.put(
                         (
                             -connected_insertion_event_depth,
                             -connected_insertion_event_stream_ordering,
-                            connected_insertion_event,
+                            connected_insertion_event_id,
+                            connected_insertion_event_type,
                         )
                     )
 
+            # Second, we need to go and try to find any batch events connected
+            # to a given insertion event (by batch_id). If we find any, we'll
+            # add them to the queue and navigate up the DAG like normal in the
+            # next iteration of the loop.
+            if event_type == EventTypes.MSC2716_INSERTION:
                 # Find any batch connections for the given insertion event
                 txn.execute(
                     batch_connection_query,
-                    (connected_insertion_event, limit - len(event_results)),
+                    (event_id, limit - len(event_results)),
                 )
                 batch_start_event_id_results = txn.fetchall()
-                logger.debug(
+                logger.info(
                     "_get_backfill_events: batch_start_event_id_results %s",
                     batch_start_event_id_results,
                 )
                 for row in batch_start_event_id_results:
                     if row[2] not in event_results:
-                        queue.put((-row[0], -row[1], row[2]))
+                        queue.put((-row[0], -row[1], row[2], row[3]))
 
-            txn.execute(query, (event_id, False, limit - len(event_results)))
+            txn.execute(
+                connected_prev_event_query,
+                (event_id, False, limit - len(event_results)),
+            )
             prev_event_id_results = txn.fetchall()
             logger.info(
                 "_get_backfill_events: prev_event_ids %s", prev_event_id_results
@@ -1177,7 +1188,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
 
             for row in prev_event_id_results:
                 if row[2] not in event_results:
-                    queue.put((-row[0], -row[1], row[2]))
+                    queue.put((-row[0], -row[1], row[2], row[3]))
 
         return event_results.values()