summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-01-30 17:17:44 +0000
committerGitHub <noreply@github.com>2020-01-30 17:17:44 +0000
commit57ad702af0511aff36ca69fb2e9fc3399cce3a8d (patch)
tree76a36a35bb29fac310d75a8e6ac2ec6c2dad8618 /synapse
parentResync remote device list when detected as stale. (#6786) (diff)
downloadsynapse-57ad702af0511aff36ca69fb2e9fc3399cce3a8d.tar.xz
Backgroud update to clean out rooms from current state (#6802)
Diffstat (limited to 'synapse')
-rw-r--r--synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql19
-rw-r--r--synapse/storage/data_stores/main/state.py108
2 files changed, 125 insertions, 2 deletions
diff --git a/synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql b/synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql
new file mode 100644
index 0000000000..a133d87a19
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql
@@ -0,0 +1,19 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Add background update to go and delete current state events for rooms the
+-- server is no longer in.
+INSERT into background_updates (update_name, progress_json)
+    VALUES ('delete_old_current_state_events', '{}');
diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py
index bd7b0276f1..9b6f68e777 100644
--- a/synapse/storage/data_stores/main/state.py
+++ b/synapse/storage/data_stores/main/state.py
@@ -21,12 +21,13 @@ from six import iteritems
 
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import NotFoundError
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
+from synapse.storage.data_stores.main.roommember import RoomMemberWorkerStore
 from synapse.storage.database import Database
 from synapse.storage.state import StateFilter
 from synapse.util.caches import intern_string
@@ -300,14 +301,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         return set(row["state_group"] for row in rows)
 
 
-class MainStateBackgroundUpdateStore(SQLBaseStore):
+class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
 
     CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
     EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"
+    DELETE_CURRENT_STATE_UPDATE_NAME = "delete_old_current_state_events"
 
     def __init__(self, database: Database, db_conn, hs):
         super(MainStateBackgroundUpdateStore, self).__init__(database, db_conn, hs)
 
+        self.server_name = hs.hostname
+
         self.db.updates.register_background_index_update(
             self.CURRENT_STATE_INDEX_UPDATE_NAME,
             index_name="current_state_events_member_index",
@@ -321,6 +325,106 @@ class MainStateBackgroundUpdateStore(SQLBaseStore):
             table="event_to_state_groups",
             columns=["state_group"],
         )
+        self.db.updates.register_background_update_handler(
+            self.DELETE_CURRENT_STATE_UPDATE_NAME, self._background_remove_left_rooms,
+        )
+
+    async def _background_remove_left_rooms(self, progress, batch_size):
+        """Background update to delete rows from `current_state_events` and
+        `event_forward_extremities` tables of rooms that the server is no
+        longer joined to.
+        """
+
+        last_room_id = progress.get("last_room_id", "")
+
+        def _background_remove_left_rooms_txn(txn):
+            sql = """
+                SELECT DISTINCT room_id FROM current_state_events
+                WHERE room_id > ? ORDER BY room_id LIMIT ?
+            """
+
+            txn.execute(sql, (last_room_id, batch_size))
+            room_ids = list(row[0] for row in txn)
+            if not room_ids:
+                return True, set()
+
+            sql = """
+                SELECT room_id
+                FROM current_state_events
+                WHERE
+                    room_id > ? AND room_id <= ?
+                    AND type = 'm.room.member'
+                    AND membership = 'join'
+                    AND state_key LIKE ?
+                GROUP BY room_id
+            """
+
+            txn.execute(sql, (last_room_id, room_ids[-1], "%:" + self.server_name))
+
+            joined_room_ids = set(row[0] for row in txn)
+
+            left_rooms = set(room_ids) - joined_room_ids
+
+            # First we get all users that we still think were joined to the
+            # room. This is so that we can mark those device lists as
+            # potentially stale, since there may have been a period where the
+            # server didn't share a room with the remote user and therefore may
+            # have missed any device updates.
+            rows = self.db.simple_select_many_txn(
+                txn,
+                table="current_state_events",
+                column="room_id",
+                iterable=left_rooms,
+                keyvalues={"type": EventTypes.Member, "membership": Membership.JOIN},
+                retcols=("state_key",),
+            )
+
+            potentially_left_users = set(row["state_key"] for row in rows)
+
+            # Now lets actually delete the rooms from the DB.
+            self.db.simple_delete_many_txn(
+                txn,
+                table="current_state_events",
+                column="room_id",
+                iterable=left_rooms,
+                keyvalues={},
+            )
+
+            self.db.simple_delete_many_txn(
+                txn,
+                table="event_forward_extremities",
+                column="room_id",
+                iterable=left_rooms,
+                keyvalues={},
+            )
+
+            self.db.updates._background_update_progress_txn(
+                txn,
+                self.DELETE_CURRENT_STATE_UPDATE_NAME,
+                {"last_room_id": room_ids[-1]},
+            )
+
+            return False, potentially_left_users
+
+        finished, potentially_left_users = await self.db.runInteraction(
+            "_background_remove_left_rooms", _background_remove_left_rooms_txn
+        )
+
+        if finished:
+            await self.db.updates._end_background_update(
+                self.DELETE_CURRENT_STATE_UPDATE_NAME
+            )
+
+        # Now go and check if we still share a room with the remote users in
+        # the deleted rooms. If not mark their device lists as stale.
+        joined_users = await self.get_users_server_still_shares_room_with(
+            potentially_left_users
+        )
+
+        for user_id in potentially_left_users - joined_users:
+            await self.mark_remote_user_device_list_as_unsubscribed(user_id)
+
+        return batch_size
 
 
 class StateStore(StateGroupWorkerStore, MainStateBackgroundUpdateStore):