diff --git a/synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql b/synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql
new file mode 100644
index 0000000000..a133d87a19
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql
@@ -0,0 +1,19 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Add background update to go and delete current state events for rooms the
+-- server is no longer in.
+INSERT into background_updates (update_name, progress_json)
+ VALUES ('delete_old_current_state_events', '{}');
diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py
index bd7b0276f1..9b6f68e777 100644
--- a/synapse/storage/data_stores/main/state.py
+++ b/synapse/storage/data_stores/main/state.py
@@ -21,12 +21,13 @@ from six import iteritems
from twisted.internet import defer
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import NotFoundError
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.storage._base import SQLBaseStore
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
+from synapse.storage.data_stores.main.roommember import RoomMemberWorkerStore
from synapse.storage.database import Database
from synapse.storage.state import StateFilter
from synapse.util.caches import intern_string
@@ -300,14 +301,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
return set(row["state_group"] for row in rows)
-class MainStateBackgroundUpdateStore(SQLBaseStore):
+class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"
+ DELETE_CURRENT_STATE_UPDATE_NAME = "delete_old_current_state_events"
def __init__(self, database: Database, db_conn, hs):
super(MainStateBackgroundUpdateStore, self).__init__(database, db_conn, hs)
+ self.server_name = hs.hostname
+
self.db.updates.register_background_index_update(
self.CURRENT_STATE_INDEX_UPDATE_NAME,
index_name="current_state_events_member_index",
@@ -321,6 +325,106 @@ class MainStateBackgroundUpdateStore(SQLBaseStore):
table="event_to_state_groups",
columns=["state_group"],
)
+ self.db.updates.register_background_update_handler(
+ self.DELETE_CURRENT_STATE_UPDATE_NAME, self._background_remove_left_rooms,
+ )
+
+ async def _background_remove_left_rooms(self, progress, batch_size):
+ """Background update to delete rows from `current_state_events` and
+ `event_forward_extremities` tables of rooms that the server is no
+ longer joined to.
+ """
+
+ last_room_id = progress.get("last_room_id", "")
+
+ def _background_remove_left_rooms_txn(txn):
+ sql = """
+ SELECT DISTINCT room_id FROM current_state_events
+ WHERE room_id > ? ORDER BY room_id LIMIT ?
+ """
+
+ txn.execute(sql, (last_room_id, batch_size))
+ room_ids = list(row[0] for row in txn)
+ if not room_ids:
+ return True, set()
+
+ sql = """
+ SELECT room_id
+ FROM current_state_events
+ WHERE
+ room_id > ? AND room_id <= ?
+ AND type = 'm.room.member'
+ AND membership = 'join'
+ AND state_key LIKE ?
+ GROUP BY room_id
+ """
+
+ txn.execute(sql, (last_room_id, room_ids[-1], "%:" + self.server_name))
+
+ joined_room_ids = set(row[0] for row in txn)
+
+ left_rooms = set(room_ids) - joined_room_ids
+
+ # First we get all users that we still think were joined to the
+ # room. This is so that we can mark those device lists as
+ # potentially stale, since there may have been a period where the
+ # server didn't share a room with the remote user and therefore may
+ # have missed any device updates.
+ rows = self.db.simple_select_many_txn(
+ txn,
+ table="current_state_events",
+ column="room_id",
+ iterable=left_rooms,
+ keyvalues={"type": EventTypes.Member, "membership": Membership.JOIN},
+ retcols=("state_key",),
+ )
+
+ potentially_left_users = set(row["state_key"] for row in rows)
+
+ # Now lets actually delete the rooms from the DB.
+ self.db.simple_delete_many_txn(
+ txn,
+ table="current_state_events",
+ column="room_id",
+ iterable=left_rooms,
+ keyvalues={},
+ )
+
+ self.db.simple_delete_many_txn(
+ txn,
+ table="event_forward_extremities",
+ column="room_id",
+ iterable=left_rooms,
+ keyvalues={},
+ )
+
+ self.db.updates._background_update_progress_txn(
+ txn,
+ self.DELETE_CURRENT_STATE_UPDATE_NAME,
+ {"last_room_id": room_ids[-1]},
+ )
+
+ return False, potentially_left_users
+
+ finished, potentially_left_users = await self.db.runInteraction(
+ "_background_remove_left_rooms", _background_remove_left_rooms_txn
+ )
+
+ if finished:
+ await self.db.updates._end_background_update(
+ self.DELETE_CURRENT_STATE_UPDATE_NAME
+ )
+
+ # Now go and check if we still share a room with the remote users in
+ # the deleted rooms. If not mark their device lists as stale.
+ joined_users = await self.get_users_server_still_shares_room_with(
+ potentially_left_users
+ )
+
+ for user_id in potentially_left_users - joined_users:
+ await self.mark_remote_user_device_list_as_unsubscribed(user_id)
+
+ return batch_size
class StateStore(StateGroupWorkerStore, MainStateBackgroundUpdateStore):
|