diff options
author | Richard van der Hoff <1389908+richvdh@users.noreply.github.com> | 2022-04-26 10:27:11 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-26 10:27:11 +0100 |
commit | 17d99f758a768b886842cf496ff236fbe3829236 (patch) | |
tree | bd2c3b7f5cf2386d33d4224f9e2533611f288f08 /synapse/storage/databases/main | |
parent | Add a table of contents to config manual (#12527) (diff) | |
download | synapse-17d99f758a768b886842cf496ff236fbe3829236.tar.xz |
Optimise backfill calculation (#12522)
Try to avoid an OOM by checking fewer extremities. Generally this is a big rewrite of _maybe_backfill, to try and fix some of the TODOs and other problems in it. It's best reviewed commit-by-commit.
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r-- | synapse/storage/databases/main/event_federation.py | 30 |
1 files changed, 14 insertions, 16 deletions
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 634e19e035..4710224708 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -695,7 +695,9 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas # Return all events where not all sets can reach them. return {eid for eid, n in event_to_missing_sets.items() if n} - async def get_oldest_event_ids_with_depth_in_room(self, room_id) -> Dict[str, int]: + async def get_oldest_event_ids_with_depth_in_room( + self, room_id + ) -> List[Tuple[str, int]]: """Gets the oldest events(backwards extremities) in the room along with the aproximate depth. @@ -708,7 +710,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas room_id: Room where we want to find the oldest events Returns: - Map from event_id to depth + List of (event_id, depth) tuples """ def get_oldest_event_ids_with_depth_in_room_txn(txn, room_id): @@ -741,7 +743,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas txn.execute(sql, (room_id, False)) - return dict(txn) + return txn.fetchall() return await self.db_pool.runInteraction( "get_oldest_event_ids_with_depth_in_room", @@ -751,7 +753,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas async def get_insertion_event_backward_extremities_in_room( self, room_id - ) -> Dict[str, int]: + ) -> List[Tuple[str, int]]: """Get the insertion events we know about that we haven't backfilled yet. We use this function so that we can compare and see if someones current @@ -763,7 +765,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas room_id: Room where we want to find the oldest events Returns: - Map from event_id to depth + List of (event_id, depth) tuples """ def get_insertion_event_backward_extremities_in_room_txn(txn, room_id): @@ -778,8 +780,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas """ txn.execute(sql, (room_id,)) - - return dict(txn) + return txn.fetchall() return await self.db_pool.runInteraction( "get_insertion_event_backward_extremities_in_room", @@ -1295,22 +1296,19 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas event_results.reverse() return event_results - async def get_successor_events(self, event_ids: Iterable[str]) -> List[str]: - """Fetch all events that have the given events as a prev event + async def get_successor_events(self, event_id: str) -> List[str]: + """Fetch all events that have the given event as a prev event Args: - event_ids: The events to use as the previous events. + event_id: The event to search for as a prev_event. """ - rows = await self.db_pool.simple_select_many_batch( + return await self.db_pool.simple_select_onecol( table="event_edges", - column="prev_event_id", - iterable=event_ids, - retcols=("event_id",), + keyvalues={"prev_event_id": event_id}, + retcol="event_id", desc="get_successor_events", ) - return [row["event_id"] for row in rows] - @wrap_as_background_process("delete_old_forward_extrem_cache") async def _delete_old_forward_extrem_cache(self) -> None: def _delete_old_forward_extrem_cache_txn(txn): |