From 84469bdac773ddb79cfc99f31bbac78d27450682 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 17 Aug 2021 14:02:50 +0100 Subject: Remove the unused public_room_list_stream (#10565) Co-authored-by: Patrick Cloke --- synapse/storage/databases/main/__init__.py | 4 +- synapse/storage/databases/main/room.py | 215 +++++------------------------ synapse/storage/schema/__init__.py | 7 +- 3 files changed, 45 insertions(+), 181 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 8d9f07111d..01b918e12e 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -127,9 +127,6 @@ class DataStore( self._clock = hs.get_clock() self.database_engine = database.engine - self._public_room_id_gen = StreamIdGenerator( - db_conn, "public_room_list_stream", "stream_id" - ) self._device_list_id_gen = StreamIdGenerator( db_conn, "device_lists_stream", @@ -170,6 +167,7 @@ class DataStore( sequence_name="cache_invalidation_stream_seq", writers=[], ) + else: self._cache_id_gen = None diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 443e5f3315..c7a1c1e8d9 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -890,55 +890,6 @@ class RoomWorkerStore(SQLBaseStore): return total_media_quarantined - async def get_all_new_public_rooms( - self, instance_name: str, last_id: int, current_id: int, limit: int - ) -> Tuple[List[Tuple[int, tuple]], int, bool]: - """Get updates for public rooms replication stream. - - Args: - instance_name: The writer we want to fetch updates from. Unused - here since there is only ever one writer. - last_id: The token to fetch updates from. Exclusive. - current_id: The token to fetch updates up to. Inclusive. - limit: The requested limit for the number of rows to return. The - function may return more or fewer rows. - - Returns: - A tuple consisting of: the updates, a token to use to fetch - subsequent updates, and whether we returned fewer rows than exists - between the requested tokens due to the limit. - - The token returned can be used in a subsequent call to this - function to get further updatees. - - The updates are a list of 2-tuples of stream ID and the row data - """ - if last_id == current_id: - return [], current_id, False - - def get_all_new_public_rooms(txn): - sql = """ - SELECT stream_id, room_id, visibility, appservice_id, network_id - FROM public_room_list_stream - WHERE stream_id > ? AND stream_id <= ? - ORDER BY stream_id ASC - LIMIT ? - """ - - txn.execute(sql, (last_id, current_id, limit)) - updates = [(row[0], row[1:]) for row in txn] - limited = False - upto_token = current_id - if len(updates) >= limit: - upto_token = updates[-1][0] - limited = True - - return updates, upto_token, limited - - return await self.db_pool.runInteraction( - "get_all_new_public_rooms", get_all_new_public_rooms - ) - async def get_rooms_for_retention_period_in_range( self, min_ms: Optional[int], max_ms: Optional[int], include_null: bool = False ) -> Dict[str, dict]: @@ -1410,34 +1361,17 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): StoreError if the room could not be stored. """ try: - - def store_room_txn(txn, next_id): - self.db_pool.simple_insert_txn( - txn, - "rooms", - { - "room_id": room_id, - "creator": room_creator_user_id, - "is_public": is_public, - "room_version": room_version.identifier, - "has_auth_chain_index": True, - }, - ) - if is_public: - self.db_pool.simple_insert_txn( - txn, - table="public_room_list_stream", - values={ - "stream_id": next_id, - "room_id": room_id, - "visibility": is_public, - }, - ) - - async with self._public_room_id_gen.get_next() as next_id: - await self.db_pool.runInteraction( - "store_room_txn", store_room_txn, next_id - ) + await self.db_pool.simple_insert( + "rooms", + { + "room_id": room_id, + "creator": room_creator_user_id, + "is_public": is_public, + "room_version": room_version.identifier, + "has_auth_chain_index": True, + }, + desc="store_room", + ) except Exception as e: logger.error("store_room with room_id=%s failed: %s", room_id, e) raise StoreError(500, "Problem creating room.") @@ -1470,49 +1404,14 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): lock=False, ) - async def set_room_is_public(self, room_id, is_public): - def set_room_is_public_txn(txn, next_id): - self.db_pool.simple_update_one_txn( - txn, - table="rooms", - keyvalues={"room_id": room_id}, - updatevalues={"is_public": is_public}, - ) - - entries = self.db_pool.simple_select_list_txn( - txn, - table="public_room_list_stream", - keyvalues={ - "room_id": room_id, - "appservice_id": None, - "network_id": None, - }, - retcols=("stream_id", "visibility"), - ) - - entries.sort(key=lambda r: r["stream_id"]) - - add_to_stream = True - if entries: - add_to_stream = bool(entries[-1]["visibility"]) != is_public - - if add_to_stream: - self.db_pool.simple_insert_txn( - txn, - table="public_room_list_stream", - values={ - "stream_id": next_id, - "room_id": room_id, - "visibility": is_public, - "appservice_id": None, - "network_id": None, - }, - ) + async def set_room_is_public(self, room_id: str, is_public: bool) -> None: + await self.db_pool.simple_update_one( + table="rooms", + keyvalues={"room_id": room_id}, + updatevalues={"is_public": is_public}, + desc="set_room_is_public", + ) - async with self._public_room_id_gen.get_next() as next_id: - await self.db_pool.runInteraction( - "set_room_is_public", set_room_is_public_txn, next_id - ) self.hs.get_notifier().on_new_replication_data() async def set_room_is_public_appservice( @@ -1533,68 +1432,33 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): list. """ - def set_room_is_public_appservice_txn(txn, next_id): - if is_public: - try: - self.db_pool.simple_insert_txn( - txn, - table="appservice_room_list", - values={ - "appservice_id": appservice_id, - "network_id": network_id, - "room_id": room_id, - }, - ) - except self.database_engine.module.IntegrityError: - # We've already inserted, nothing to do. - return - else: - self.db_pool.simple_delete_txn( - txn, - table="appservice_room_list", - keyvalues={ - "appservice_id": appservice_id, - "network_id": network_id, - "room_id": room_id, - }, - ) - - entries = self.db_pool.simple_select_list_txn( - txn, - table="public_room_list_stream", + if is_public: + await self.db_pool.simple_upsert( + table="appservice_room_list", keyvalues={ + "appservice_id": appservice_id, + "network_id": network_id, "room_id": room_id, + }, + values={}, + insertion_values={ "appservice_id": appservice_id, "network_id": network_id, + "room_id": room_id, }, - retcols=("stream_id", "visibility"), + desc="set_room_is_public_appservice_true", ) - - entries.sort(key=lambda r: r["stream_id"]) - - add_to_stream = True - if entries: - add_to_stream = bool(entries[-1]["visibility"]) != is_public - - if add_to_stream: - self.db_pool.simple_insert_txn( - txn, - table="public_room_list_stream", - values={ - "stream_id": next_id, - "room_id": room_id, - "visibility": is_public, - "appservice_id": appservice_id, - "network_id": network_id, - }, - ) - - async with self._public_room_id_gen.get_next() as next_id: - await self.db_pool.runInteraction( - "set_room_is_public_appservice", - set_room_is_public_appservice_txn, - next_id, + else: + await self.db_pool.simple_delete( + table="appservice_room_list", + keyvalues={ + "appservice_id": appservice_id, + "network_id": network_id, + "room_id": room_id, + }, + desc="set_room_is_public_appservice_false", ) + self.hs.get_notifier().on_new_replication_data() async def add_event_report( @@ -1787,9 +1651,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): "get_event_reports_paginate", _get_event_reports_paginate_txn ) - def get_current_public_room_stream_id(self): - return self._public_room_id_gen.get_current_token() - async def block_room(self, room_id: str, user_id: str) -> None: """Marks the room as blocked. Can be called multiple times. diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 7e0687e197..a5bc0ee8a5 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -SCHEMA_VERSION = 62 +SCHEMA_VERSION = 63 """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -25,6 +25,11 @@ for more information on how this works. Changes in SCHEMA_VERSION = 61: - The `user_stats_historical` and `room_stats_historical` tables are not written and are not read (previously, they were written but not read). + +Changes in SCHEMA_VERSION = 63: + - The `public_room_list_stream` table is not written nor read to + (previously, it was written and read to, but not for any significant purpose). + https://github.com/matrix-org/synapse/pull/10565 """ -- cgit 1.4.1 From 703e3a9e853b7c2212045ec52eb6b2c6e370c6f9 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 17 Aug 2021 14:33:16 +0100 Subject: Allow /createRoom to be run on workers (#10564) Fixes https://github.com/matrix-org/synapse/issues/7867 --- changelog.d/10564.feature | 1 + docs/workers.md | 1 + synapse/rest/client/room.py | 2 +- synapse/storage/databases/main/room.py | 68 +++++++++++++++++----------------- 4 files changed, 37 insertions(+), 35 deletions(-) create mode 100644 changelog.d/10564.feature (limited to 'synapse/storage') diff --git a/changelog.d/10564.feature b/changelog.d/10564.feature new file mode 100644 index 0000000000..4de32240b2 --- /dev/null +++ b/changelog.d/10564.feature @@ -0,0 +1 @@ +Add support for routing `/createRoom` to workers. diff --git a/docs/workers.md b/docs/workers.md index d8672324c3..1657dfc759 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -214,6 +214,7 @@ expressions: ^/_matrix/federation/v1/send/ # Client API requests + ^/_matrix/client/(api/v1|r0|unstable)/createRoom$ ^/_matrix/client/(api/v1|r0|unstable)/publicRooms$ ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/joined_members$ ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/context/.*$ diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index ed238b2141..c5c54564be 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -1141,10 +1141,10 @@ def register_servlets(hs: "HomeServer", http_server, is_worker=False): JoinedRoomsRestServlet(hs).register(http_server) RoomAliasListServlet(hs).register(http_server) SearchRestServlet(hs).register(http_server) + RoomCreateRestServlet(hs).register(http_server) # Some servlets only get registered for the main process. if not is_worker: - RoomCreateRestServlet(hs).register(http_server) RoomForgetRestServlet(hs).register(http_server) diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index c7a1c1e8d9..f98b892598 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -73,6 +73,40 @@ class RoomWorkerStore(SQLBaseStore): self.config = hs.config + async def store_room( + self, + room_id: str, + room_creator_user_id: str, + is_public: bool, + room_version: RoomVersion, + ): + """Stores a room. + + Args: + room_id: The desired room ID, can be None. + room_creator_user_id: The user ID of the room creator. + is_public: True to indicate that this room should appear in + public room lists. + room_version: The version of the room + Raises: + StoreError if the room could not be stored. + """ + try: + await self.db_pool.simple_insert( + "rooms", + { + "room_id": room_id, + "creator": room_creator_user_id, + "is_public": is_public, + "room_version": room_version.identifier, + "has_auth_chain_index": True, + }, + desc="store_room", + ) + except Exception as e: + logger.error("store_room with room_id=%s failed: %s", room_id, e) + raise StoreError(500, "Problem creating room.") + async def get_room(self, room_id: str) -> dict: """Retrieve a room. @@ -1342,40 +1376,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): lock=False, ) - async def store_room( - self, - room_id: str, - room_creator_user_id: str, - is_public: bool, - room_version: RoomVersion, - ): - """Stores a room. - - Args: - room_id: The desired room ID, can be None. - room_creator_user_id: The user ID of the room creator. - is_public: True to indicate that this room should appear in - public room lists. - room_version: The version of the room - Raises: - StoreError if the room could not be stored. - """ - try: - await self.db_pool.simple_insert( - "rooms", - { - "room_id": room_id, - "creator": room_creator_user_id, - "is_public": is_public, - "room_version": room_version.identifier, - "has_auth_chain_index": True, - }, - desc="store_room", - ) - except Exception as e: - logger.error("store_room with room_id=%s failed: %s", room_id, e) - raise StoreError(500, "Problem creating room.") - async def maybe_store_room_on_outlier_membership( self, room_id: str, room_version: RoomVersion ): -- cgit 1.4.1