summary refs log tree commit diff
path: root/synapse/storage/databases/main/event_federation.py
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2022-04-26 10:27:11 +0100
committerGitHub <noreply@github.com>2022-04-26 10:27:11 +0100
commit17d99f758a768b886842cf496ff236fbe3829236 (patch)
treebd2c3b7f5cf2386d33d4224f9e2533611f288f08 /synapse/storage/databases/main/event_federation.py
parentAdd a table of contents to config manual (#12527) (diff)
downloadsynapse-17d99f758a768b886842cf496ff236fbe3829236.tar.xz
Optimise backfill calculation (#12522)
Try to avoid an OOM by checking fewer extremities.

Generally this is a big rewrite of _maybe_backfill, to try and fix some of the TODOs and other problems in it. It's best reviewed commit-by-commit.
Diffstat (limited to 'synapse/storage/databases/main/event_federation.py')
-rw-r--r--synapse/storage/databases/main/event_federation.py30
1 files changed, 14 insertions, 16 deletions
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 634e19e035..4710224708 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -695,7 +695,9 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
         # Return all events where not all sets can reach them.
         return {eid for eid, n in event_to_missing_sets.items() if n}
 
-    async def get_oldest_event_ids_with_depth_in_room(self, room_id) -> Dict[str, int]:
+    async def get_oldest_event_ids_with_depth_in_room(
+        self, room_id
+    ) -> List[Tuple[str, int]]:
         """Gets the oldest events(backwards extremities) in the room along with the
         aproximate depth.
 
@@ -708,7 +710,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             room_id: Room where we want to find the oldest events
 
         Returns:
-            Map from event_id to depth
+            List of (event_id, depth) tuples
         """
 
         def get_oldest_event_ids_with_depth_in_room_txn(txn, room_id):
@@ -741,7 +743,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
 
             txn.execute(sql, (room_id, False))
 
-            return dict(txn)
+            return txn.fetchall()
 
         return await self.db_pool.runInteraction(
             "get_oldest_event_ids_with_depth_in_room",
@@ -751,7 +753,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
 
     async def get_insertion_event_backward_extremities_in_room(
         self, room_id
-    ) -> Dict[str, int]:
+    ) -> List[Tuple[str, int]]:
         """Get the insertion events we know about that we haven't backfilled yet.
 
         We use this function so that we can compare and see if someones current
@@ -763,7 +765,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             room_id: Room where we want to find the oldest events
 
         Returns:
-            Map from event_id to depth
+            List of (event_id, depth) tuples
         """
 
         def get_insertion_event_backward_extremities_in_room_txn(txn, room_id):
@@ -778,8 +780,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             """
 
             txn.execute(sql, (room_id,))
-
-            return dict(txn)
+            return txn.fetchall()
 
         return await self.db_pool.runInteraction(
             "get_insertion_event_backward_extremities_in_room",
@@ -1295,22 +1296,19 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
         event_results.reverse()
         return event_results
 
-    async def get_successor_events(self, event_ids: Iterable[str]) -> List[str]:
-        """Fetch all events that have the given events as a prev event
+    async def get_successor_events(self, event_id: str) -> List[str]:
+        """Fetch all events that have the given event as a prev event
 
         Args:
-            event_ids: The events to use as the previous events.
+            event_id: The event to search for as a prev_event.
         """
-        rows = await self.db_pool.simple_select_many_batch(
+        return await self.db_pool.simple_select_onecol(
             table="event_edges",
-            column="prev_event_id",
-            iterable=event_ids,
-            retcols=("event_id",),
+            keyvalues={"prev_event_id": event_id},
+            retcol="event_id",
             desc="get_successor_events",
         )
 
-        return [row["event_id"] for row in rows]
-
     @wrap_as_background_process("delete_old_forward_extrem_cache")
     async def _delete_old_forward_extrem_cache(self) -> None:
         def _delete_old_forward_extrem_cache_txn(txn):