summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2021-08-18 17:02:47 +0100
committerErik Johnston <erik@matrix.org>2021-08-18 17:02:47 +0100
commit78a70a2e0bdff5bdf02ba093eefab0b16ebefd73 (patch)
tree8de7b7805a9840101b00529a50812d6ffcb99cc4 /synapse/storage/databases
parentFix weakref_slot parameter for room member storage attrs. (#10642) (diff)
parentUpdate docs/upgrade.md with new version (diff)
downloadsynapse-78a70a2e0bdff5bdf02ba093eefab0b16ebefd73.tar.xz
Merge branch 'release-v1.41' into develop
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/__init__.py4
-rw-r--r--synapse/storage/databases/main/room.py261
2 files changed, 62 insertions, 203 deletions
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 8d9f07111d..01b918e12e 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -127,9 +127,6 @@ class DataStore(
         self._clock = hs.get_clock()
         self.database_engine = database.engine
 
-        self._public_room_id_gen = StreamIdGenerator(
-            db_conn, "public_room_list_stream", "stream_id"
-        )
         self._device_list_id_gen = StreamIdGenerator(
             db_conn,
             "device_lists_stream",
@@ -170,6 +167,7 @@ class DataStore(
                 sequence_name="cache_invalidation_stream_seq",
                 writers=[],
             )
+
         else:
             self._cache_id_gen = None
 
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 443e5f3315..f98b892598 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -73,6 +73,40 @@ class RoomWorkerStore(SQLBaseStore):
 
         self.config = hs.config
 
+    async def store_room(
+        self,
+        room_id: str,
+        room_creator_user_id: str,
+        is_public: bool,
+        room_version: RoomVersion,
+    ):
+        """Stores a room.
+
+        Args:
+            room_id: The desired room ID, can be None.
+            room_creator_user_id: The user ID of the room creator.
+            is_public: True to indicate that this room should appear in
+                public room lists.
+            room_version: The version of the room
+        Raises:
+            StoreError if the room could not be stored.
+        """
+        try:
+            await self.db_pool.simple_insert(
+                "rooms",
+                {
+                    "room_id": room_id,
+                    "creator": room_creator_user_id,
+                    "is_public": is_public,
+                    "room_version": room_version.identifier,
+                    "has_auth_chain_index": True,
+                },
+                desc="store_room",
+            )
+        except Exception as e:
+            logger.error("store_room with room_id=%s failed: %s", room_id, e)
+            raise StoreError(500, "Problem creating room.")
+
     async def get_room(self, room_id: str) -> dict:
         """Retrieve a room.
 
@@ -890,55 +924,6 @@ class RoomWorkerStore(SQLBaseStore):
 
         return total_media_quarantined
 
-    async def get_all_new_public_rooms(
-        self, instance_name: str, last_id: int, current_id: int, limit: int
-    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
-        """Get updates for public rooms replication stream.
-
-        Args:
-            instance_name: The writer we want to fetch updates from. Unused
-                here since there is only ever one writer.
-            last_id: The token to fetch updates from. Exclusive.
-            current_id: The token to fetch updates up to. Inclusive.
-            limit: The requested limit for the number of rows to return. The
-                function may return more or fewer rows.
-
-        Returns:
-            A tuple consisting of: the updates, a token to use to fetch
-            subsequent updates, and whether we returned fewer rows than exists
-            between the requested tokens due to the limit.
-
-            The token returned can be used in a subsequent call to this
-            function to get further updatees.
-
-            The updates are a list of 2-tuples of stream ID and the row data
-        """
-        if last_id == current_id:
-            return [], current_id, False
-
-        def get_all_new_public_rooms(txn):
-            sql = """
-                SELECT stream_id, room_id, visibility, appservice_id, network_id
-                FROM public_room_list_stream
-                WHERE stream_id > ? AND stream_id <= ?
-                ORDER BY stream_id ASC
-                LIMIT ?
-            """
-
-            txn.execute(sql, (last_id, current_id, limit))
-            updates = [(row[0], row[1:]) for row in txn]
-            limited = False
-            upto_token = current_id
-            if len(updates) >= limit:
-                upto_token = updates[-1][0]
-                limited = True
-
-            return updates, upto_token, limited
-
-        return await self.db_pool.runInteraction(
-            "get_all_new_public_rooms", get_all_new_public_rooms
-        )
-
     async def get_rooms_for_retention_period_in_range(
         self, min_ms: Optional[int], max_ms: Optional[int], include_null: bool = False
     ) -> Dict[str, dict]:
@@ -1391,57 +1376,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
             lock=False,
         )
 
-    async def store_room(
-        self,
-        room_id: str,
-        room_creator_user_id: str,
-        is_public: bool,
-        room_version: RoomVersion,
-    ):
-        """Stores a room.
-
-        Args:
-            room_id: The desired room ID, can be None.
-            room_creator_user_id: The user ID of the room creator.
-            is_public: True to indicate that this room should appear in
-                public room lists.
-            room_version: The version of the room
-        Raises:
-            StoreError if the room could not be stored.
-        """
-        try:
-
-            def store_room_txn(txn, next_id):
-                self.db_pool.simple_insert_txn(
-                    txn,
-                    "rooms",
-                    {
-                        "room_id": room_id,
-                        "creator": room_creator_user_id,
-                        "is_public": is_public,
-                        "room_version": room_version.identifier,
-                        "has_auth_chain_index": True,
-                    },
-                )
-                if is_public:
-                    self.db_pool.simple_insert_txn(
-                        txn,
-                        table="public_room_list_stream",
-                        values={
-                            "stream_id": next_id,
-                            "room_id": room_id,
-                            "visibility": is_public,
-                        },
-                    )
-
-            async with self._public_room_id_gen.get_next() as next_id:
-                await self.db_pool.runInteraction(
-                    "store_room_txn", store_room_txn, next_id
-                )
-        except Exception as e:
-            logger.error("store_room with room_id=%s failed: %s", room_id, e)
-            raise StoreError(500, "Problem creating room.")
-
     async def maybe_store_room_on_outlier_membership(
         self, room_id: str, room_version: RoomVersion
     ):
@@ -1470,49 +1404,14 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
             lock=False,
         )
 
-    async def set_room_is_public(self, room_id, is_public):
-        def set_room_is_public_txn(txn, next_id):
-            self.db_pool.simple_update_one_txn(
-                txn,
-                table="rooms",
-                keyvalues={"room_id": room_id},
-                updatevalues={"is_public": is_public},
-            )
-
-            entries = self.db_pool.simple_select_list_txn(
-                txn,
-                table="public_room_list_stream",
-                keyvalues={
-                    "room_id": room_id,
-                    "appservice_id": None,
-                    "network_id": None,
-                },
-                retcols=("stream_id", "visibility"),
-            )
-
-            entries.sort(key=lambda r: r["stream_id"])
-
-            add_to_stream = True
-            if entries:
-                add_to_stream = bool(entries[-1]["visibility"]) != is_public
-
-            if add_to_stream:
-                self.db_pool.simple_insert_txn(
-                    txn,
-                    table="public_room_list_stream",
-                    values={
-                        "stream_id": next_id,
-                        "room_id": room_id,
-                        "visibility": is_public,
-                        "appservice_id": None,
-                        "network_id": None,
-                    },
-                )
+    async def set_room_is_public(self, room_id: str, is_public: bool) -> None:
+        await self.db_pool.simple_update_one(
+            table="rooms",
+            keyvalues={"room_id": room_id},
+            updatevalues={"is_public": is_public},
+            desc="set_room_is_public",
+        )
 
-        async with self._public_room_id_gen.get_next() as next_id:
-            await self.db_pool.runInteraction(
-                "set_room_is_public", set_room_is_public_txn, next_id
-            )
         self.hs.get_notifier().on_new_replication_data()
 
     async def set_room_is_public_appservice(
@@ -1533,68 +1432,33 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
                 list.
         """
 
-        def set_room_is_public_appservice_txn(txn, next_id):
-            if is_public:
-                try:
-                    self.db_pool.simple_insert_txn(
-                        txn,
-                        table="appservice_room_list",
-                        values={
-                            "appservice_id": appservice_id,
-                            "network_id": network_id,
-                            "room_id": room_id,
-                        },
-                    )
-                except self.database_engine.module.IntegrityError:
-                    # We've already inserted, nothing to do.
-                    return
-            else:
-                self.db_pool.simple_delete_txn(
-                    txn,
-                    table="appservice_room_list",
-                    keyvalues={
-                        "appservice_id": appservice_id,
-                        "network_id": network_id,
-                        "room_id": room_id,
-                    },
-                )
-
-            entries = self.db_pool.simple_select_list_txn(
-                txn,
-                table="public_room_list_stream",
+        if is_public:
+            await self.db_pool.simple_upsert(
+                table="appservice_room_list",
                 keyvalues={
+                    "appservice_id": appservice_id,
+                    "network_id": network_id,
                     "room_id": room_id,
+                },
+                values={},
+                insertion_values={
                     "appservice_id": appservice_id,
                     "network_id": network_id,
+                    "room_id": room_id,
                 },
-                retcols=("stream_id", "visibility"),
+                desc="set_room_is_public_appservice_true",
             )
-
-            entries.sort(key=lambda r: r["stream_id"])
-
-            add_to_stream = True
-            if entries:
-                add_to_stream = bool(entries[-1]["visibility"]) != is_public
-
-            if add_to_stream:
-                self.db_pool.simple_insert_txn(
-                    txn,
-                    table="public_room_list_stream",
-                    values={
-                        "stream_id": next_id,
-                        "room_id": room_id,
-                        "visibility": is_public,
-                        "appservice_id": appservice_id,
-                        "network_id": network_id,
-                    },
-                )
-
-        async with self._public_room_id_gen.get_next() as next_id:
-            await self.db_pool.runInteraction(
-                "set_room_is_public_appservice",
-                set_room_is_public_appservice_txn,
-                next_id,
+        else:
+            await self.db_pool.simple_delete(
+                table="appservice_room_list",
+                keyvalues={
+                    "appservice_id": appservice_id,
+                    "network_id": network_id,
+                    "room_id": room_id,
+                },
+                desc="set_room_is_public_appservice_false",
             )
+
         self.hs.get_notifier().on_new_replication_data()
 
     async def add_event_report(
@@ -1787,9 +1651,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
             "get_event_reports_paginate", _get_event_reports_paginate_txn
         )
 
-    def get_current_public_room_stream_id(self):
-        return self._public_room_id_gen.get_current_token()
-
     async def block_room(self, room_id: str, user_id: str) -> None:
         """Marks the room as blocked. Can be called multiple times.