summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/rest/admin/rooms.py5
-rw-r--r--synapse/storage/databases/main/events_forward_extremities.py49
2 files changed, 53 insertions, 1 deletions
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 1f7b7daea9..76f8603821 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -543,6 +543,11 @@ class ForwardExtremitiesRestServlet(RestServlet):
 
         room_id = await self.resolve_room_id(room_identifier)
 
+        deleted_count = await self.store.delete_forward_extremities_for_room(room_id)
+        return 200, {
+            "deleted": deleted_count,
+        }
+
     async def on_GET(self, request, room_identifier):
         requester = await self.auth.get_user_by_req(request)
         await assert_user_is_admin(self.auth, requester.user)
diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py
index 250a424cc0..cc684a94fe 100644
--- a/synapse/storage/databases/main/events_forward_extremities.py
+++ b/synapse/storage/databases/main/events_forward_extremities.py
@@ -4,7 +4,54 @@ from synapse.storage._base import SQLBaseStore
 
 
 class EventForwardExtremitiesStore(SQLBaseStore):
+
+    async def delete_forward_extremities_for_room(self, room_id: str) -> int:
+        """Delete any extra forward extremities for a room.
+
+        Returns count deleted.
+        """
+        def delete_forward_extremities_for_room_txn(txn):
+            # First we need to get the event_id to not delete
+            sql = (
+                "SELECT "
+                "   last_value(event_id) OVER w AS event_id"
+                "   FROM event_forward_extremities"
+                "   NATURAL JOIN events"
+                " where room_id = ?"
+                "   WINDOW w AS ("
+                "   PARTITION BY room_id"
+                "       ORDER BY stream_ordering"
+                "       range between unbounded preceding and unbounded following"
+                "   )"
+                "   ORDER BY stream_ordering"
+            )
+            txn.execute(sql, (room_id,))
+            rows = txn.fetchall()
+
+            # TODO: should this raise a SynapseError instead of better to blow?
+            event_id = rows[0][0]
+
+            # Now delete the extra forward extremities
+            sql = (
+                "DELETE FROM event_forward_extremities "
+                "WHERE"
+                "   event_id != ?"
+                "   AND room_id = ?"
+            )
+
+            # TODO we should not commit yet
+            txn.execute(sql, (event_id, room_id))
+
+            # TODO flush the cache then commit
+
+            return txn.rowcount
+
+        return await self.db_pool.runInteraction(
+            "delete_forward_extremities_for_room", delete_forward_extremities_for_room_txn,
+        )
+
     async def get_forward_extremities_for_room(self, room_id: str) -> List[Dict]:
+        """Get list of forward extremities for a room."""
         def get_forward_extremities_for_room_txn(txn):
             sql = (
                 "SELECT event_id, state_group FROM event_forward_extremities NATURAL JOIN event_to_state_groups "
@@ -16,5 +63,5 @@ class EventForwardExtremitiesStore(SQLBaseStore):
             return [{"event_id": row[0], "state_group": row[1]} for row in rows]
 
         return await self.db_pool.runInteraction(
-            "get_forward_extremities_for_room", get_forward_extremities_for_room_txn
+            "get_forward_extremities_for_room", get_forward_extremities_for_room_txn,
         )