diff --git a/changelog.d/6792.misc b/changelog.d/6792.misc
new file mode 100644
index 0000000000..fa31d509b3
--- /dev/null
+++ b/changelog.d/6792.misc
@@ -0,0 +1 @@
+Delete current state from the database when server leaves a room.
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index ce553566a5..c9d0d68c3a 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -32,6 +32,7 @@ from twisted.internet import defer
import synapse.metrics
from synapse.api.constants import EventContentFields, EventTypes
from synapse.api.errors import SynapseError
+from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event_dict
@@ -468,84 +469,93 @@ class EventsStore(
to_delete = delta_state.to_delete
to_insert = delta_state.to_insert
- # First we add entries to the current_state_delta_stream. We
- # do this before updating the current_state_events table so
- # that we can use it to calculate the `prev_event_id`. (This
- # allows us to not have to pull out the existing state
- # unnecessarily).
- #
- # The stream_id for the update is chosen to be the minimum of the stream_ids
- # for the batch of the events that we are persisting; that means we do not
- # end up in a situation where workers see events before the
- # current_state_delta updates.
- #
- sql = """
- INSERT INTO current_state_delta_stream
- (stream_id, room_id, type, state_key, event_id, prev_event_id)
- SELECT ?, ?, ?, ?, ?, (
- SELECT event_id FROM current_state_events
- WHERE room_id = ? AND type = ? AND state_key = ?
+ if delta_state.no_longer_in_room:
+ # Server is no longer in the room so we delete the room from
+ # current_state_events, being careful we've already updated the
+ # rooms.room_version column (which gets populated in a
+ # background task).
+ self._upsert_room_version_txn(txn, room_id)
+
+ # Before deleting we populate the current_state_delta_stream
+ # so that async background tasks get told what happened.
+ sql = """
+ INSERT INTO current_state_delta_stream
+ (stream_id, room_id, type, state_key, event_id, prev_event_id)
+ SELECT ?, room_id, type, state_key, null, event_id
+ FROM current_state_events
+ WHERE room_id = ?
+ """
+ txn.execute(sql, (stream_id, room_id))
+
+ self.db.simple_delete_txn(
+ txn, table="current_state_events", keyvalues={"room_id": room_id},
)
- """
- txn.executemany(
- sql,
- (
- (
- stream_id,
- room_id,
- etype,
- state_key,
- None,
- room_id,
- etype,
- state_key,
+ else:
+ # We're still in the room, so we update the current state as normal.
+
+ # First we add entries to the current_state_delta_stream. We
+ # do this before updating the current_state_events table so
+ # that we can use it to calculate the `prev_event_id`. (This
+ # allows us to not have to pull out the existing state
+ # unnecessarily).
+ #
+ # The stream_id for the update is chosen to be the minimum of the stream_ids
+ # for the batch of the events that we are persisting; that means we do not
+ # end up in a situation where workers see events before the
+ # current_state_delta updates.
+ #
+ sql = """
+ INSERT INTO current_state_delta_stream
+ (stream_id, room_id, type, state_key, event_id, prev_event_id)
+ SELECT ?, ?, ?, ?, ?, (
+ SELECT event_id FROM current_state_events
+ WHERE room_id = ? AND type = ? AND state_key = ?
)
- for etype, state_key in to_delete
- # We sanity check that we're deleting rather than updating
- if (etype, state_key) not in to_insert
- ),
- )
- txn.executemany(
- sql,
- (
+ """
+ txn.executemany(
+ sql,
(
- stream_id,
- room_id,
- etype,
- state_key,
- ev_id,
- room_id,
- etype,
- state_key,
- )
- for (etype, state_key), ev_id in iteritems(to_insert)
- ),
- )
+ (
+ stream_id,
+ room_id,
+ etype,
+ state_key,
+ to_insert.get((etype, state_key)),
+ room_id,
+ etype,
+ state_key,
+ )
+ for etype, state_key in itertools.chain(to_delete, to_insert)
+ ),
+ )
+ # Now we actually update the current_state_events table
- # Now we actually update the current_state_events table
+ txn.executemany(
+ "DELETE FROM current_state_events"
+ " WHERE room_id = ? AND type = ? AND state_key = ?",
+ (
+ (room_id, etype, state_key)
+ for etype, state_key in itertools.chain(to_delete, to_insert)
+ ),
+ )
- txn.executemany(
- "DELETE FROM current_state_events"
- " WHERE room_id = ? AND type = ? AND state_key = ?",
- (
- (room_id, etype, state_key)
- for etype, state_key in itertools.chain(to_delete, to_insert)
- ),
- )
+ # We include the membership in the current state table, hence we do
+ # a lookup when we insert. This assumes that all events have already
+ # been inserted into room_memberships.
+ txn.executemany(
+ """INSERT INTO current_state_events
+ (room_id, type, state_key, event_id, membership)
+ VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
+ """,
+ [
+ (room_id, key[0], key[1], ev_id, ev_id)
+ for key, ev_id in iteritems(to_insert)
+ ],
+ )
- # We include the membership in the current state table, hence we do
- # a lookup when we insert. This assumes that all events have already
- # been inserted into room_memberships.
- txn.executemany(
- """INSERT INTO current_state_events
- (room_id, type, state_key, event_id, membership)
- VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
- """,
- [
- (room_id, key[0], key[1], ev_id, ev_id)
- for key, ev_id in iteritems(to_insert)
- ],
- )
+ # We now update `local_current_membership`. We do this regardless
+ # of whether we're still in the room or not to handle the case where
+ # e.g. we just got banned (where we need to record that fact here).
# Note: Do we really want to delete rows here (that we do not
# subsequently reinsert below)? While technically correct it means
@@ -601,6 +611,35 @@ class EventsStore(
self._invalidate_state_caches_and_stream(txn, room_id, members_changed)
+ def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str):
+ """Update the room version in the database based off current state
+ events.
+
+ This is used when we're about to delete current state and we want to
+ ensure that the `rooms.room_version` column is up to date.
+ """
+
+ sql = """
+ SELECT json FROM event_json
+ INNER JOIN current_state_events USING (room_id, event_id)
+ WHERE room_id = ? AND type = ? AND state_key = ?
+ """
+ txn.execute(sql, (room_id, EventTypes.Create, ""))
+ row = txn.fetchone()
+ if row:
+ event_json = json.loads(row[0])
+ content = event_json.get("content", {})
+ creator = content.get("creator")
+ room_version_id = content.get("room_version", RoomVersions.V1.identifier)
+
+ self.db.simple_upsert_txn(
+ txn,
+ table="rooms",
+ keyvalues={"room_id": room_id},
+ values={"room_version": room_version_id},
+ insertion_values={"is_public": False, "creator": creator},
+ )
+
def _update_forward_extremities_txn(
self, txn, new_forward_extremities, max_stream_order
):
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 368c457321..d060c8b992 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import itertools
import logging
from collections import deque, namedtuple
from typing import Iterable, List, Optional, Tuple
@@ -27,7 +28,7 @@ from prometheus_client import Counter, Histogram
from twisted.internet import defer
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
@@ -72,17 +73,20 @@ stale_forward_extremities_counter = Histogram(
)
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True)
class DeltaState:
"""Deltas to use to update the `current_state_events` table.
Attributes:
to_delete: List of type/state_keys to delete from current state
to_insert: Map of state to upsert into current state
+ no_longer_in_room: The server is not longer in the room, so the room
+ should e.g. be removed from `current_state_events` table.
"""
to_delete = attr.ib(type=List[Tuple[str, str]])
to_insert = attr.ib(type=StateMap[str])
+ no_longer_in_room = attr.ib(type=bool, default=False)
class _EventPeristenceQueue(object):
@@ -396,11 +400,12 @@ class EventsPersistenceStorage(object):
# If either are not None then there has been a change,
# and we need to work out the delta (or use that
# given)
+ delta = None
if delta_ids is not None:
# If there is a delta we know that we've
# only added or replaced state, never
# removed keys entirely.
- state_delta_for_room[room_id] = DeltaState([], delta_ids)
+ delta = DeltaState([], delta_ids)
elif current_state is not None:
with Measure(
self._clock, "persist_events.calculate_state_delta"
@@ -408,6 +413,22 @@ class EventsPersistenceStorage(object):
delta = await self._calculate_state_delta(
room_id, current_state
)
+
+ if delta:
+ # If we have a change of state then lets check
+ # whether we're actually still a member of the room,
+ # or if our last user left. If we're no longer in
+ # the room then we delete the current state and
+ # extremities.
+ is_still_joined = await self._is_server_still_joined(
+ room_id, ev_ctx_rm, delta, current_state
+ )
+ if not is_still_joined:
+ logger.info("Server no longer in room %s", room_id)
+ latest_event_ids = []
+ current_state = {}
+ delta.no_longer_in_room = True
+
state_delta_for_room[room_id] = delta
# If we have the current_state then lets prefill
@@ -660,3 +681,65 @@ class EventsPersistenceStorage(object):
}
return DeltaState(to_delete=to_delete, to_insert=to_insert)
+
+ async def _is_server_still_joined(
+ self,
+ room_id: str,
+ ev_ctx_rm: List[Tuple[FrozenEvent, EventContext]],
+ delta: DeltaState,
+ current_state: Optional[StateMap[str]],
+ ) -> bool:
+ """Check if the server will still be joined after the given events have
+ been persised.
+
+ Args:
+ room_id
+ ev_ctx_rm
+ delta: The delta of current state between what is in the database
+ and what the new current state will be.
+ current_state: The new current state if it already been calculated,
+ otherwise None.
+ """
+
+ if not any(
+ self.is_mine_id(state_key)
+ for typ, state_key in itertools.chain(delta.to_delete, delta.to_insert)
+ if typ == EventTypes.Member
+ ):
+ # There have been no changes to membership of our users, so nothing
+ # has changed and we assume we're still in the room.
+ return True
+
+ # Check if any of the given events are a local join that appear in the
+ # current state
+ for (typ, state_key), event_id in delta.to_insert.items():
+ if typ != EventTypes.Member or not self.is_mine_id(state_key):
+ continue
+
+ for event, _ in ev_ctx_rm:
+ if event_id == event.event_id:
+ if event.membership == Membership.JOIN:
+ return True
+
+ # There's been a change of membership but we don't have a local join
+ # event in the new events, so we need to check the full state.
+ if current_state is None:
+ current_state = await self.main_store.get_current_state_ids(room_id)
+ current_state = dict(current_state)
+ for key in delta.to_delete:
+ current_state.pop(key, None)
+
+ current_state.update(delta.to_insert)
+
+ event_ids = [
+ event_id
+ for (typ, state_key,), event_id in current_state.items()
+ if typ == EventTypes.Member and self.is_mine_id(state_key)
+ ]
+
+ rows = await self.main_store.get_membership_from_event_ids(event_ids)
+ is_still_joined = any(row["membership"] == Membership.JOIN for row in rows)
+ if is_still_joined:
+ return True
+ else:
+ return False
|