diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 443e5f3315..6e7312266d 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -19,9 +19,10 @@ from abc import abstractmethod
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
-from synapse.api.constants import EventTypes, JoinRules
+from synapse.api.constants import EventContentFields, EventTypes, JoinRules
from synapse.api.errors import StoreError
from synapse.api.room_versions import RoomVersion, RoomVersions
+from synapse.events import EventBase
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.search import SearchStore
@@ -73,6 +74,40 @@ class RoomWorkerStore(SQLBaseStore):
self.config = hs.config
+ async def store_room(
+ self,
+ room_id: str,
+ room_creator_user_id: str,
+ is_public: bool,
+ room_version: RoomVersion,
+ ):
+ """Stores a room.
+
+ Args:
+ room_id: The desired room ID, can be None.
+ room_creator_user_id: The user ID of the room creator.
+ is_public: True to indicate that this room should appear in
+ public room lists.
+ room_version: The version of the room
+ Raises:
+ StoreError if the room could not be stored.
+ """
+ try:
+ await self.db_pool.simple_insert(
+ "rooms",
+ {
+ "room_id": room_id,
+ "creator": room_creator_user_id,
+ "is_public": is_public,
+ "room_version": room_version.identifier,
+ "has_auth_chain_index": True,
+ },
+ desc="store_room",
+ )
+ except Exception as e:
+ logger.error("store_room with room_id=%s failed: %s", room_id, e)
+ raise StoreError(500, "Problem creating room.")
+
async def get_room(self, room_id: str) -> dict:
"""Retrieve a room.
@@ -890,55 +925,6 @@ class RoomWorkerStore(SQLBaseStore):
return total_media_quarantined
- async def get_all_new_public_rooms(
- self, instance_name: str, last_id: int, current_id: int, limit: int
- ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
- """Get updates for public rooms replication stream.
-
- Args:
- instance_name: The writer we want to fetch updates from. Unused
- here since there is only ever one writer.
- last_id: The token to fetch updates from. Exclusive.
- current_id: The token to fetch updates up to. Inclusive.
- limit: The requested limit for the number of rows to return. The
- function may return more or fewer rows.
-
- Returns:
- A tuple consisting of: the updates, a token to use to fetch
- subsequent updates, and whether we returned fewer rows than exists
- between the requested tokens due to the limit.
-
- The token returned can be used in a subsequent call to this
- function to get further updatees.
-
- The updates are a list of 2-tuples of stream ID and the row data
- """
- if last_id == current_id:
- return [], current_id, False
-
- def get_all_new_public_rooms(txn):
- sql = """
- SELECT stream_id, room_id, visibility, appservice_id, network_id
- FROM public_room_list_stream
- WHERE stream_id > ? AND stream_id <= ?
- ORDER BY stream_id ASC
- LIMIT ?
- """
-
- txn.execute(sql, (last_id, current_id, limit))
- updates = [(row[0], row[1:]) for row in txn]
- limited = False
- upto_token = current_id
- if len(updates) >= limit:
- upto_token = updates[-1][0]
- limited = True
-
- return updates, upto_token, limited
-
- return await self.db_pool.runInteraction(
- "get_all_new_public_rooms", get_all_new_public_rooms
- )
-
async def get_rooms_for_retention_period_in_range(
self, min_ms: Optional[int], max_ms: Optional[int], include_null: bool = False
) -> Dict[str, dict]:
@@ -1028,6 +1014,7 @@ class _BackgroundUpdates:
ADD_ROOMS_ROOM_VERSION_COLUMN = "add_rooms_room_version_column"
POPULATE_ROOM_DEPTH_MIN_DEPTH2 = "populate_room_depth_min_depth2"
REPLACE_ROOM_DEPTH_MIN_DEPTH = "replace_room_depth_min_depth"
+ POPULATE_ROOMS_CREATOR_COLUMN = "populate_rooms_creator_column"
_REPLACE_ROOM_DEPTH_SQL_COMMANDS = (
@@ -1069,6 +1056,11 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
self._background_replace_room_depth_min_depth,
)
+ self.db_pool.updates.register_background_update_handler(
+ _BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN,
+ self._background_populate_rooms_creator_column,
+ )
+
async def _background_insert_retention(self, progress, batch_size):
"""Retrieves a list of all rooms within a range and inserts an entry for each of
them into the room_retention table.
@@ -1288,7 +1280,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
keyvalues={"room_id": room_id},
retcol="MAX(stream_ordering)",
allow_none=True,
- desc="upsert_room_on_join",
+ desc="has_auth_chain_index_fallback",
)
return max_ordering is None
@@ -1358,6 +1350,65 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
return 0
+ async def _background_populate_rooms_creator_column(
+ self, progress: dict, batch_size: int
+ ):
+ """Background update to go and add creator information to `rooms`
+ table from `current_state_events` table.
+ """
+
+ last_room_id = progress.get("room_id", "")
+
+ def _background_populate_rooms_creator_column_txn(txn: LoggingTransaction):
+ sql = """
+ SELECT room_id, json FROM event_json
+ INNER JOIN rooms AS room USING (room_id)
+ INNER JOIN current_state_events AS state_event USING (room_id, event_id)
+ WHERE room_id > ? AND (room.creator IS NULL OR room.creator = '') AND state_event.type = 'm.room.create' AND state_event.state_key = ''
+ ORDER BY room_id
+ LIMIT ?
+ """
+
+ txn.execute(sql, (last_room_id, batch_size))
+ room_id_to_create_event_results = txn.fetchall()
+
+ new_last_room_id = ""
+ for room_id, event_json in room_id_to_create_event_results:
+ event_dict = db_to_json(event_json)
+
+ creator = event_dict.get("content").get(EventContentFields.ROOM_CREATOR)
+
+ self.db_pool.simple_update_txn(
+ txn,
+ table="rooms",
+ keyvalues={"room_id": room_id},
+ updatevalues={"creator": creator},
+ )
+ new_last_room_id = room_id
+
+ if new_last_room_id == "":
+ return True
+
+ self.db_pool.updates._background_update_progress_txn(
+ txn,
+ _BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN,
+ {"room_id": new_last_room_id},
+ )
+
+ return False
+
+ end = await self.db_pool.runInteraction(
+ "_background_populate_rooms_creator_column",
+ _background_populate_rooms_creator_column_txn,
+ )
+
+ if end:
+ await self.db_pool.updates._end_background_update(
+ _BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN
+ )
+
+ return batch_size
+
class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
def __init__(self, database: DatabasePool, db_conn, hs):
@@ -1365,7 +1416,9 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
self.config = hs.config
- async def upsert_room_on_join(self, room_id: str, room_version: RoomVersion):
+ async def upsert_room_on_join(
+ self, room_id: str, room_version: RoomVersion, auth_events: List[EventBase]
+ ):
"""Ensure that the room is stored in the table
Called when we join a room over federation, and overwrites any room version
@@ -1376,6 +1429,24 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
# mark the room as having an auth chain cover index.
has_auth_chain_index = await self.has_auth_chain_index(room_id)
+ create_event = None
+ for e in auth_events:
+ if (e.type, e.state_key) == (EventTypes.Create, ""):
+ create_event = e
+ break
+
+ if create_event is None:
+ # If the state doesn't have a create event then the room is
+ # invalid, and it would fail auth checks anyway.
+ raise StoreError(400, "No create event in state")
+
+ room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR)
+
+ if not isinstance(room_creator, str):
+ # If the create event does not have a creator then the room is
+ # invalid, and it would fail auth checks anyway.
+ raise StoreError(400, "No creator defined on the create event")
+
await self.db_pool.simple_upsert(
desc="upsert_room_on_join",
table="rooms",
@@ -1383,7 +1454,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
values={"room_version": room_version.identifier},
insertion_values={
"is_public": False,
- "creator": "",
+ "creator": room_creator,
"has_auth_chain_index": has_auth_chain_index,
},
# rooms has a unique constraint on room_id, so no need to lock when doing an
@@ -1391,57 +1462,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
lock=False,
)
- async def store_room(
- self,
- room_id: str,
- room_creator_user_id: str,
- is_public: bool,
- room_version: RoomVersion,
- ):
- """Stores a room.
-
- Args:
- room_id: The desired room ID, can be None.
- room_creator_user_id: The user ID of the room creator.
- is_public: True to indicate that this room should appear in
- public room lists.
- room_version: The version of the room
- Raises:
- StoreError if the room could not be stored.
- """
- try:
-
- def store_room_txn(txn, next_id):
- self.db_pool.simple_insert_txn(
- txn,
- "rooms",
- {
- "room_id": room_id,
- "creator": room_creator_user_id,
- "is_public": is_public,
- "room_version": room_version.identifier,
- "has_auth_chain_index": True,
- },
- )
- if is_public:
- self.db_pool.simple_insert_txn(
- txn,
- table="public_room_list_stream",
- values={
- "stream_id": next_id,
- "room_id": room_id,
- "visibility": is_public,
- },
- )
-
- async with self._public_room_id_gen.get_next() as next_id:
- await self.db_pool.runInteraction(
- "store_room_txn", store_room_txn, next_id
- )
- except Exception as e:
- logger.error("store_room with room_id=%s failed: %s", room_id, e)
- raise StoreError(500, "Problem creating room.")
-
async def maybe_store_room_on_outlier_membership(
self, room_id: str, room_version: RoomVersion
):
@@ -1462,6 +1482,9 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
insertion_values={
"room_version": room_version.identifier,
"is_public": False,
+ # We don't worry about setting the `creator` here because
+ # we don't process any messages in a room while a user is
+ # invited (only after the join).
"creator": "",
"has_auth_chain_index": has_auth_chain_index,
},
@@ -1470,49 +1493,14 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
lock=False,
)
- async def set_room_is_public(self, room_id, is_public):
- def set_room_is_public_txn(txn, next_id):
- self.db_pool.simple_update_one_txn(
- txn,
- table="rooms",
- keyvalues={"room_id": room_id},
- updatevalues={"is_public": is_public},
- )
-
- entries = self.db_pool.simple_select_list_txn(
- txn,
- table="public_room_list_stream",
- keyvalues={
- "room_id": room_id,
- "appservice_id": None,
- "network_id": None,
- },
- retcols=("stream_id", "visibility"),
- )
-
- entries.sort(key=lambda r: r["stream_id"])
-
- add_to_stream = True
- if entries:
- add_to_stream = bool(entries[-1]["visibility"]) != is_public
-
- if add_to_stream:
- self.db_pool.simple_insert_txn(
- txn,
- table="public_room_list_stream",
- values={
- "stream_id": next_id,
- "room_id": room_id,
- "visibility": is_public,
- "appservice_id": None,
- "network_id": None,
- },
- )
+ async def set_room_is_public(self, room_id: str, is_public: bool) -> None:
+ await self.db_pool.simple_update_one(
+ table="rooms",
+ keyvalues={"room_id": room_id},
+ updatevalues={"is_public": is_public},
+ desc="set_room_is_public",
+ )
- async with self._public_room_id_gen.get_next() as next_id:
- await self.db_pool.runInteraction(
- "set_room_is_public", set_room_is_public_txn, next_id
- )
self.hs.get_notifier().on_new_replication_data()
async def set_room_is_public_appservice(
@@ -1533,68 +1521,33 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
list.
"""
- def set_room_is_public_appservice_txn(txn, next_id):
- if is_public:
- try:
- self.db_pool.simple_insert_txn(
- txn,
- table="appservice_room_list",
- values={
- "appservice_id": appservice_id,
- "network_id": network_id,
- "room_id": room_id,
- },
- )
- except self.database_engine.module.IntegrityError:
- # We've already inserted, nothing to do.
- return
- else:
- self.db_pool.simple_delete_txn(
- txn,
- table="appservice_room_list",
- keyvalues={
- "appservice_id": appservice_id,
- "network_id": network_id,
- "room_id": room_id,
- },
- )
-
- entries = self.db_pool.simple_select_list_txn(
- txn,
- table="public_room_list_stream",
+ if is_public:
+ await self.db_pool.simple_upsert(
+ table="appservice_room_list",
keyvalues={
+ "appservice_id": appservice_id,
+ "network_id": network_id,
"room_id": room_id,
+ },
+ values={},
+ insertion_values={
"appservice_id": appservice_id,
"network_id": network_id,
+ "room_id": room_id,
},
- retcols=("stream_id", "visibility"),
+ desc="set_room_is_public_appservice_true",
)
-
- entries.sort(key=lambda r: r["stream_id"])
-
- add_to_stream = True
- if entries:
- add_to_stream = bool(entries[-1]["visibility"]) != is_public
-
- if add_to_stream:
- self.db_pool.simple_insert_txn(
- txn,
- table="public_room_list_stream",
- values={
- "stream_id": next_id,
- "room_id": room_id,
- "visibility": is_public,
- "appservice_id": appservice_id,
- "network_id": network_id,
- },
- )
-
- async with self._public_room_id_gen.get_next() as next_id:
- await self.db_pool.runInteraction(
- "set_room_is_public_appservice",
- set_room_is_public_appservice_txn,
- next_id,
+ else:
+ await self.db_pool.simple_delete(
+ table="appservice_room_list",
+ keyvalues={
+ "appservice_id": appservice_id,
+ "network_id": network_id,
+ "room_id": room_id,
+ },
+ desc="set_room_is_public_appservice_false",
)
+
self.hs.get_notifier().on_new_replication_data()
async def add_event_report(
@@ -1787,9 +1740,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
"get_event_reports_paginate", _get_event_reports_paginate_txn
)
- def get_current_public_room_stream_id(self):
- return self._public_room_id_gen.get_current_token()
-
async def block_room(self, room_id: str, user_id: str) -> None:
"""Marks the room as blocked. Can be called multiple times.
|