summary refs log tree commit diff
path: root/synapse/storage/data_stores
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-01-29 11:01:32 +0000
committerGitHub <noreply@github.com>2020-01-29 11:01:32 +0000
commit611215a49cedf8d5f63c53168173763731d02260 (patch)
tree8c0ad8ba10fb7fcb3e8f73aca662f5adf190fba5 /synapse/storage/data_stores
parentFix bug when querying remote user keys that require a resync. (#6796) (diff)
downloadsynapse-611215a49cedf8d5f63c53168173763731d02260.tar.xz
Delete current state when server leaves a room (#6792)
Otherwise its just stale data, which may get deleted later anyway so
can't be relied on. It's also a bit of a shotgun if we're trying to get
the current state of a room we're not in.
Diffstat (limited to 'synapse/storage/data_stores')
-rw-r--r--synapse/storage/data_stores/main/events.py183
1 files changed, 111 insertions, 72 deletions
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index ce553566a5..c9d0d68c3a 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -32,6 +32,7 @@ from twisted.internet import defer
 import synapse.metrics
 from synapse.api.constants import EventContentFields, EventTypes
 from synapse.api.errors import SynapseError
+from synapse.api.room_versions import RoomVersions
 from synapse.events import EventBase  # noqa: F401
 from synapse.events.snapshot import EventContext  # noqa: F401
 from synapse.events.utils import prune_event_dict
@@ -468,84 +469,93 @@ class EventsStore(
             to_delete = delta_state.to_delete
             to_insert = delta_state.to_insert
 
-            # First we add entries to the current_state_delta_stream. We
-            # do this before updating the current_state_events table so
-            # that we can use it to calculate the `prev_event_id`. (This
-            # allows us to not have to pull out the existing state
-            # unnecessarily).
-            #
-            # The stream_id for the update is chosen to be the minimum of the stream_ids
-            # for the batch of the events that we are persisting; that means we do not
-            # end up in a situation where workers see events before the
-            # current_state_delta updates.
-            #
-            sql = """
-                INSERT INTO current_state_delta_stream
-                (stream_id, room_id, type, state_key, event_id, prev_event_id)
-                SELECT ?, ?, ?, ?, ?, (
-                    SELECT event_id FROM current_state_events
-                    WHERE room_id = ? AND type = ? AND state_key = ?
+            if delta_state.no_longer_in_room:
+                # Server is no longer in the room so we delete the room from
+                # current_state_events, being careful we've already updated the
+                # rooms.room_version column (which gets populated in a
+                # background task).
+                self._upsert_room_version_txn(txn, room_id)
+
+                # Before deleting we populate the current_state_delta_stream
+                # so that async background tasks get told what happened.
+                sql = """
+                    INSERT INTO current_state_delta_stream
+                        (stream_id, room_id, type, state_key, event_id, prev_event_id)
+                    SELECT ?, room_id, type, state_key, null, event_id
+                        FROM current_state_events
+                        WHERE room_id = ?
+                """
+                txn.execute(sql, (stream_id, room_id))
+
+                self.db.simple_delete_txn(
+                    txn, table="current_state_events", keyvalues={"room_id": room_id},
                 )
-            """
-            txn.executemany(
-                sql,
-                (
-                    (
-                        stream_id,
-                        room_id,
-                        etype,
-                        state_key,
-                        None,
-                        room_id,
-                        etype,
-                        state_key,
+            else:
+                # We're still in the room, so we update the current state as normal.
+
+                # First we add entries to the current_state_delta_stream. We
+                # do this before updating the current_state_events table so
+                # that we can use it to calculate the `prev_event_id`. (This
+                # allows us to not have to pull out the existing state
+                # unnecessarily).
+                #
+                # The stream_id for the update is chosen to be the minimum of the stream_ids
+                # for the batch of the events that we are persisting; that means we do not
+                # end up in a situation where workers see events before the
+                # current_state_delta updates.
+                #
+                sql = """
+                    INSERT INTO current_state_delta_stream
+                    (stream_id, room_id, type, state_key, event_id, prev_event_id)
+                    SELECT ?, ?, ?, ?, ?, (
+                        SELECT event_id FROM current_state_events
+                        WHERE room_id = ? AND type = ? AND state_key = ?
                     )
-                    for etype, state_key in to_delete
-                    # We sanity check that we're deleting rather than updating
-                    if (etype, state_key) not in to_insert
-                ),
-            )
-            txn.executemany(
-                sql,
-                (
+                """
+                txn.executemany(
+                    sql,
                     (
-                        stream_id,
-                        room_id,
-                        etype,
-                        state_key,
-                        ev_id,
-                        room_id,
-                        etype,
-                        state_key,
-                    )
-                    for (etype, state_key), ev_id in iteritems(to_insert)
-                ),
-            )
+                        (
+                            stream_id,
+                            room_id,
+                            etype,
+                            state_key,
+                            to_insert.get((etype, state_key)),
+                            room_id,
+                            etype,
+                            state_key,
+                        )
+                        for etype, state_key in itertools.chain(to_delete, to_insert)
+                    ),
+                )
+                # Now we actually update the current_state_events table
 
-            # Now we actually update the current_state_events table
+                txn.executemany(
+                    "DELETE FROM current_state_events"
+                    " WHERE room_id = ? AND type = ? AND state_key = ?",
+                    (
+                        (room_id, etype, state_key)
+                        for etype, state_key in itertools.chain(to_delete, to_insert)
+                    ),
+                )
 
-            txn.executemany(
-                "DELETE FROM current_state_events"
-                " WHERE room_id = ? AND type = ? AND state_key = ?",
-                (
-                    (room_id, etype, state_key)
-                    for etype, state_key in itertools.chain(to_delete, to_insert)
-                ),
-            )
+                # We include the membership in the current state table, hence we do
+                # a lookup when we insert. This assumes that all events have already
+                # been inserted into room_memberships.
+                txn.executemany(
+                    """INSERT INTO current_state_events
+                        (room_id, type, state_key, event_id, membership)
+                    VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
+                    """,
+                    [
+                        (room_id, key[0], key[1], ev_id, ev_id)
+                        for key, ev_id in iteritems(to_insert)
+                    ],
+                )
 
-            # We include the membership in the current state table, hence we do
-            # a lookup when we insert. This assumes that all events have already
-            # been inserted into room_memberships.
-            txn.executemany(
-                """INSERT INTO current_state_events
-                    (room_id, type, state_key, event_id, membership)
-                VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
-                """,
-                [
-                    (room_id, key[0], key[1], ev_id, ev_id)
-                    for key, ev_id in iteritems(to_insert)
-                ],
-            )
+            # We now update `local_current_membership`. We do this regardless
+            # of whether we're still in the room or not to handle the case where
+            # e.g. we just got banned (where we need to record that fact here).
 
             # Note: Do we really want to delete rows here (that we do not
             # subsequently reinsert below)? While technically correct it means
@@ -601,6 +611,35 @@ class EventsStore(
 
             self._invalidate_state_caches_and_stream(txn, room_id, members_changed)
 
+    def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str):
+        """Update the room version in the database based off current state
+        events.
+
+        This is used when we're about to delete current state and we want to
+        ensure that the `rooms.room_version` column is up to date.
+        """
+
+        sql = """
+            SELECT json FROM event_json
+            INNER JOIN current_state_events USING (room_id, event_id)
+            WHERE room_id = ? AND type = ? AND state_key = ?
+        """
+        txn.execute(sql, (room_id, EventTypes.Create, ""))
+        row = txn.fetchone()
+        if row:
+            event_json = json.loads(row[0])
+            content = event_json.get("content", {})
+            creator = content.get("creator")
+            room_version_id = content.get("room_version", RoomVersions.V1.identifier)
+
+            self.db.simple_upsert_txn(
+                txn,
+                table="rooms",
+                keyvalues={"room_id": room_id},
+                values={"room_version": room_version_id},
+                insertion_values={"is_public": False, "creator": creator},
+            )
+
     def _update_forward_extremities_txn(
         self, txn, new_forward_extremities, max_stream_order
     ):