diff --git a/changelog.d/15382.misc b/changelog.d/15382.misc
new file mode 100644
index 0000000000..c5b054d19e
--- /dev/null
+++ b/changelog.d/15382.misc
@@ -0,0 +1 @@
+Improve DB performance of clearing out old data from `stream_ordering_to_exterm`.
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 9ded6389ac..d2063d4435 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -215,6 +215,16 @@ class DeviceWorkerHandler:
possibly_changed = set(changed)
possibly_left = set()
for room_id in rooms_changed:
+ # Check if the forward extremities have changed. If not then we know
+ # the current state won't have changed, and so we can skip this room.
+ try:
+ if not await self.store.have_room_forward_extremities_changed_since(
+ room_id, stream_ordering
+ ):
+ continue
+ except errors.StoreError:
+ pass
+
current_state_ids = await self._state_storage.get_current_state_ids(
room_id, await_full_state=False
)
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index a19ba88bf8..9e6011e8ea 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1171,6 +1171,38 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
return int(min_depth) if min_depth is not None else None
+ async def have_room_forward_extremities_changed_since(
+ self,
+ room_id: str,
+ stream_ordering: int,
+ ) -> bool:
+ """Check if the forward extremities in a room have changed since the
+ given stream ordering
+
+ Throws a StoreError if we have since purged the index for
+ stream_orderings from that point.
+ """
+
+ if stream_ordering <= self.stream_ordering_month_ago: # type: ignore[attr-defined]
+ raise StoreError(400, f"stream_ordering too old {stream_ordering}")
+
+ sql = """
+ SELECT 1 FROM stream_ordering_to_exterm
+ WHERE stream_ordering > ? AND room_id = ?
+ LIMIT 1
+ """
+
+ def have_room_forward_extremities_changed_since_txn(
+ txn: LoggingTransaction,
+ ) -> bool:
+ txn.execute(sql, (stream_ordering, room_id))
+ return txn.fetchone() is not None
+
+ return await self.db_pool.runInteraction(
+ "have_room_forward_extremities_changed_since",
+ have_room_forward_extremities_changed_since_txn,
+ )
+
@cancellable
async def get_forward_extremities_for_room_at_stream_ordering(
self, room_id: str, stream_ordering: int
@@ -1232,10 +1264,17 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
txn.execute(sql, (stream_ordering, room_id))
return [event_id for event_id, in txn]
- return await self.db_pool.runInteraction(
+ event_ids = await self.db_pool.runInteraction(
"get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn
)
+ # If we didn't find any IDs, then we must have cleared out the
+ # associated `stream_ordering_to_exterm`.
+ if not event_ids:
+ raise StoreError(400, "stream_ordering too old %s" % (stream_ordering,))
+
+ return event_ids
+
def _get_connected_batch_event_backfill_results_txn(
self, txn: LoggingTransaction, insertion_event_id: str, limit: int
) -> List[BackfillQueueNavigationItem]:
@@ -1664,19 +1703,12 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
@wrap_as_background_process("delete_old_forward_extrem_cache")
async def _delete_old_forward_extrem_cache(self) -> None:
def _delete_old_forward_extrem_cache_txn(txn: LoggingTransaction) -> None:
- # Delete entries older than a month, while making sure we don't delete
- # the only entries for a room.
sql = """
DELETE FROM stream_ordering_to_exterm
- WHERE
- room_id IN (
- SELECT room_id
- FROM stream_ordering_to_exterm
- WHERE stream_ordering > ?
- ) AND stream_ordering < ?
+ WHERE stream_ordering < ?
"""
txn.execute(
- sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago) # type: ignore[attr-defined]
+ sql, (self.stream_ordering_month_ago) # type: ignore[attr-defined]
)
await self.db_pool.runInteraction(
|