summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2023-04-06 17:42:39 +0100
committerGitHub <noreply@github.com>2023-04-06 16:42:39 +0000
commit485b9fdefb9f45df172ff5044d6a02a177b7de19 (patch)
tree6046f925f91a4d54eacedfb22219b480c03be427 /synapse
parentMerge remote-tracking branch 'origin/release-v1.81' into develop (diff)
downloadsynapse-485b9fdefb9f45df172ff5044d6a02a177b7de19.tar.xz
Don't keep old stream_ordering_to_exterm around (#15382)
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/device.py10
-rw-r--r--synapse/storage/databases/main/event_federation.py52
2 files changed, 52 insertions, 10 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 9ded6389ac..d2063d4435 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -215,6 +215,16 @@ class DeviceWorkerHandler:
         possibly_changed = set(changed)
         possibly_left = set()
         for room_id in rooms_changed:
+            # Check if the forward extremities have changed. If not then we know
+            # the current state won't have changed, and so we can skip this room.
+            try:
+                if not await self.store.have_room_forward_extremities_changed_since(
+                    room_id, stream_ordering
+                ):
+                    continue
+            except errors.StoreError:
+                pass
+
             current_state_ids = await self._state_storage.get_current_state_ids(
                 room_id, await_full_state=False
             )
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index a19ba88bf8..9e6011e8ea 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1171,6 +1171,38 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
 
         return int(min_depth) if min_depth is not None else None
 
+    async def have_room_forward_extremities_changed_since(
+        self,
+        room_id: str,
+        stream_ordering: int,
+    ) -> bool:
+        """Check if the forward extremities in a room have changed since the
+        given stream ordering
+
+        Throws a StoreError if we have since purged the index for
+        stream_orderings from that point.
+        """
+
+        if stream_ordering <= self.stream_ordering_month_ago:  # type: ignore[attr-defined]
+            raise StoreError(400, f"stream_ordering too old {stream_ordering}")
+
+        sql = """
+            SELECT 1 FROM stream_ordering_to_exterm
+            WHERE stream_ordering > ? AND room_id = ?
+            LIMIT 1
+        """
+
+        def have_room_forward_extremities_changed_since_txn(
+            txn: LoggingTransaction,
+        ) -> bool:
+            txn.execute(sql, (stream_ordering, room_id))
+            return txn.fetchone() is not None
+
+        return await self.db_pool.runInteraction(
+            "have_room_forward_extremities_changed_since",
+            have_room_forward_extremities_changed_since_txn,
+        )
+
     @cancellable
     async def get_forward_extremities_for_room_at_stream_ordering(
         self, room_id: str, stream_ordering: int
@@ -1232,10 +1264,17 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             txn.execute(sql, (stream_ordering, room_id))
             return [event_id for event_id, in txn]
 
-        return await self.db_pool.runInteraction(
+        event_ids = await self.db_pool.runInteraction(
             "get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn
         )
 
+        # If we didn't find any IDs, then we must have cleared out the
+        # associated `stream_ordering_to_exterm`.
+        if not event_ids:
+            raise StoreError(400, "stream_ordering too old %s" % (stream_ordering,))
+
+        return event_ids
+
     def _get_connected_batch_event_backfill_results_txn(
         self, txn: LoggingTransaction, insertion_event_id: str, limit: int
     ) -> List[BackfillQueueNavigationItem]:
@@ -1664,19 +1703,12 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
     @wrap_as_background_process("delete_old_forward_extrem_cache")
     async def _delete_old_forward_extrem_cache(self) -> None:
         def _delete_old_forward_extrem_cache_txn(txn: LoggingTransaction) -> None:
-            # Delete entries older than a month, while making sure we don't delete
-            # the only entries for a room.
             sql = """
                 DELETE FROM stream_ordering_to_exterm
-                WHERE
-                room_id IN (
-                    SELECT room_id
-                    FROM stream_ordering_to_exterm
-                    WHERE stream_ordering > ?
-                ) AND stream_ordering < ?
+                WHERE stream_ordering < ?
             """
             txn.execute(
-                sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago)  # type: ignore[attr-defined]
+                sql, (self.stream_ordering_month_ago)  # type: ignore[attr-defined]
             )
 
         await self.db_pool.runInteraction(