summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/10565.misc1
-rw-r--r--synapse/app/admin_cmd.py2
-rw-r--r--synapse/app/generic_worker.py4
-rw-r--r--synapse/replication/slave/storage/room.py37
-rw-r--r--synapse/replication/tcp/streams/__init__.py3
-rw-r--r--synapse/replication/tcp/streams/_base.py25
-rw-r--r--synapse/storage/databases/main/__init__.py4
-rw-r--r--synapse/storage/databases/main/room.py215
-rw-r--r--synapse/storage/schema/__init__.py7
9 files changed, 48 insertions, 250 deletions
diff --git a/changelog.d/10565.misc b/changelog.d/10565.misc
new file mode 100644
index 0000000000..06796b61ab
--- /dev/null
+++ b/changelog.d/10565.misc
@@ -0,0 +1 @@
+Remove the unused public rooms replication stream.
\ No newline at end of file
diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py
index 3234d9ebba..7396db93c6 100644
--- a/synapse/app/admin_cmd.py
+++ b/synapse/app/admin_cmd.py
@@ -38,7 +38,6 @@ from synapse.replication.slave.storage.groups import SlavedGroupServerStore
 from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.room import RoomStore
 from synapse.server import HomeServer
 from synapse.util.logcontext import LoggingContext
 from synapse.util.versionstring import get_version_string
@@ -58,7 +57,6 @@ class AdminCmdSlavedStore(
     SlavedPushRuleStore,
     SlavedEventStore,
     SlavedClientIpStore,
-    RoomStore,
     BaseSlavedStore,
 ):
     pass
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index d7b425a7ab..845e6a8220 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -64,7 +64,6 @@ from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
 from synapse.replication.slave.storage.pushers import SlavedPusherStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.room import RoomStore
 from synapse.rest.admin import register_servlets_for_media_repo
 from synapse.rest.client import (
     account_data,
@@ -114,6 +113,7 @@ from synapse.storage.databases.main.monthly_active_users import (
     MonthlyActiveUsersWorkerStore,
 )
 from synapse.storage.databases.main.presence import PresenceStore
+from synapse.storage.databases.main.room import RoomWorkerStore
 from synapse.storage.databases.main.search import SearchStore
 from synapse.storage.databases.main.stats import StatsStore
 from synapse.storage.databases.main.transactions import TransactionWorkerStore
@@ -237,7 +237,7 @@ class GenericWorkerSlavedStore(
     ClientIpWorkerStore,
     SlavedEventStore,
     SlavedKeyStore,
-    RoomStore,
+    RoomWorkerStore,
     DirectoryStore,
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
deleted file mode 100644
index 8cc6de3f46..0000000000
--- a/synapse/replication/slave/storage/room.py
+++ /dev/null
@@ -1,37 +0,0 @@
-# Copyright 2015, 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.replication.tcp.streams import PublicRoomsStream
-from synapse.storage.database import DatabasePool
-from synapse.storage.databases.main.room import RoomWorkerStore
-
-from ._base import BaseSlavedStore
-from ._slaved_id_tracker import SlavedIdTracker
-
-
-class RoomStore(RoomWorkerStore, BaseSlavedStore):
-    def __init__(self, database: DatabasePool, db_conn, hs):
-        super().__init__(database, db_conn, hs)
-        self._public_room_id_gen = SlavedIdTracker(
-            db_conn, "public_room_list_stream", "stream_id"
-        )
-
-    def get_current_public_room_stream_id(self):
-        return self._public_room_id_gen.get_current_token()
-
-    def process_replication_rows(self, stream_name, instance_name, token, rows):
-        if stream_name == PublicRoomsStream.NAME:
-            self._public_room_id_gen.advance(instance_name, token)
-
-        return super().process_replication_rows(stream_name, instance_name, token, rows)
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index 4c0023c68a..f41eabd85e 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -32,7 +32,6 @@ from synapse.replication.tcp.streams._base import (
     GroupServerStream,
     PresenceFederationStream,
     PresenceStream,
-    PublicRoomsStream,
     PushersStream,
     PushRulesStream,
     ReceiptsStream,
@@ -57,7 +56,6 @@ STREAMS_MAP = {
         PushRulesStream,
         PushersStream,
         CachesStream,
-        PublicRoomsStream,
         DeviceListsStream,
         ToDeviceStream,
         FederationStream,
@@ -79,7 +77,6 @@ __all__ = [
     "PushRulesStream",
     "PushersStream",
     "CachesStream",
-    "PublicRoomsStream",
     "DeviceListsStream",
     "ToDeviceStream",
     "TagAccountDataStream",
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 3716c41bea..9b905aba9d 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -447,31 +447,6 @@ class CachesStream(Stream):
         )
 
 
-class PublicRoomsStream(Stream):
-    """The public rooms list changed"""
-
-    PublicRoomsStreamRow = namedtuple(
-        "PublicRoomsStreamRow",
-        (
-            "room_id",  # str
-            "visibility",  # str
-            "appservice_id",  # str, optional
-            "network_id",  # str, optional
-        ),
-    )
-
-    NAME = "public_rooms"
-    ROW_TYPE = PublicRoomsStreamRow
-
-    def __init__(self, hs):
-        store = hs.get_datastore()
-        super().__init__(
-            hs.get_instance_name(),
-            current_token_without_instance(store.get_current_public_room_stream_id),
-            store.get_all_new_public_rooms,
-        )
-
-
 class DeviceListsStream(Stream):
     """Either a user has updated their devices or a remote server needs to be
     told about a device update.
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 8d9f07111d..01b918e12e 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -127,9 +127,6 @@ class DataStore(
         self._clock = hs.get_clock()
         self.database_engine = database.engine
 
-        self._public_room_id_gen = StreamIdGenerator(
-            db_conn, "public_room_list_stream", "stream_id"
-        )
         self._device_list_id_gen = StreamIdGenerator(
             db_conn,
             "device_lists_stream",
@@ -170,6 +167,7 @@ class DataStore(
                 sequence_name="cache_invalidation_stream_seq",
                 writers=[],
             )
+
         else:
             self._cache_id_gen = None
 
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 443e5f3315..c7a1c1e8d9 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -890,55 +890,6 @@ class RoomWorkerStore(SQLBaseStore):
 
         return total_media_quarantined
 
-    async def get_all_new_public_rooms(
-        self, instance_name: str, last_id: int, current_id: int, limit: int
-    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
-        """Get updates for public rooms replication stream.
-
-        Args:
-            instance_name: The writer we want to fetch updates from. Unused
-                here since there is only ever one writer.
-            last_id: The token to fetch updates from. Exclusive.
-            current_id: The token to fetch updates up to. Inclusive.
-            limit: The requested limit for the number of rows to return. The
-                function may return more or fewer rows.
-
-        Returns:
-            A tuple consisting of: the updates, a token to use to fetch
-            subsequent updates, and whether we returned fewer rows than exists
-            between the requested tokens due to the limit.
-
-            The token returned can be used in a subsequent call to this
-            function to get further updatees.
-
-            The updates are a list of 2-tuples of stream ID and the row data
-        """
-        if last_id == current_id:
-            return [], current_id, False
-
-        def get_all_new_public_rooms(txn):
-            sql = """
-                SELECT stream_id, room_id, visibility, appservice_id, network_id
-                FROM public_room_list_stream
-                WHERE stream_id > ? AND stream_id <= ?
-                ORDER BY stream_id ASC
-                LIMIT ?
-            """
-
-            txn.execute(sql, (last_id, current_id, limit))
-            updates = [(row[0], row[1:]) for row in txn]
-            limited = False
-            upto_token = current_id
-            if len(updates) >= limit:
-                upto_token = updates[-1][0]
-                limited = True
-
-            return updates, upto_token, limited
-
-        return await self.db_pool.runInteraction(
-            "get_all_new_public_rooms", get_all_new_public_rooms
-        )
-
     async def get_rooms_for_retention_period_in_range(
         self, min_ms: Optional[int], max_ms: Optional[int], include_null: bool = False
     ) -> Dict[str, dict]:
@@ -1410,34 +1361,17 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
             StoreError if the room could not be stored.
         """
         try:
-
-            def store_room_txn(txn, next_id):
-                self.db_pool.simple_insert_txn(
-                    txn,
-                    "rooms",
-                    {
-                        "room_id": room_id,
-                        "creator": room_creator_user_id,
-                        "is_public": is_public,
-                        "room_version": room_version.identifier,
-                        "has_auth_chain_index": True,
-                    },
-                )
-                if is_public:
-                    self.db_pool.simple_insert_txn(
-                        txn,
-                        table="public_room_list_stream",
-                        values={
-                            "stream_id": next_id,
-                            "room_id": room_id,
-                            "visibility": is_public,
-                        },
-                    )
-
-            async with self._public_room_id_gen.get_next() as next_id:
-                await self.db_pool.runInteraction(
-                    "store_room_txn", store_room_txn, next_id
-                )
+            await self.db_pool.simple_insert(
+                "rooms",
+                {
+                    "room_id": room_id,
+                    "creator": room_creator_user_id,
+                    "is_public": is_public,
+                    "room_version": room_version.identifier,
+                    "has_auth_chain_index": True,
+                },
+                desc="store_room",
+            )
         except Exception as e:
             logger.error("store_room with room_id=%s failed: %s", room_id, e)
             raise StoreError(500, "Problem creating room.")
@@ -1470,49 +1404,14 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
             lock=False,
         )
 
-    async def set_room_is_public(self, room_id, is_public):
-        def set_room_is_public_txn(txn, next_id):
-            self.db_pool.simple_update_one_txn(
-                txn,
-                table="rooms",
-                keyvalues={"room_id": room_id},
-                updatevalues={"is_public": is_public},
-            )
-
-            entries = self.db_pool.simple_select_list_txn(
-                txn,
-                table="public_room_list_stream",
-                keyvalues={
-                    "room_id": room_id,
-                    "appservice_id": None,
-                    "network_id": None,
-                },
-                retcols=("stream_id", "visibility"),
-            )
-
-            entries.sort(key=lambda r: r["stream_id"])
-
-            add_to_stream = True
-            if entries:
-                add_to_stream = bool(entries[-1]["visibility"]) != is_public
-
-            if add_to_stream:
-                self.db_pool.simple_insert_txn(
-                    txn,
-                    table="public_room_list_stream",
-                    values={
-                        "stream_id": next_id,
-                        "room_id": room_id,
-                        "visibility": is_public,
-                        "appservice_id": None,
-                        "network_id": None,
-                    },
-                )
+    async def set_room_is_public(self, room_id: str, is_public: bool) -> None:
+        await self.db_pool.simple_update_one(
+            table="rooms",
+            keyvalues={"room_id": room_id},
+            updatevalues={"is_public": is_public},
+            desc="set_room_is_public",
+        )
 
-        async with self._public_room_id_gen.get_next() as next_id:
-            await self.db_pool.runInteraction(
-                "set_room_is_public", set_room_is_public_txn, next_id
-            )
         self.hs.get_notifier().on_new_replication_data()
 
     async def set_room_is_public_appservice(
@@ -1533,68 +1432,33 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
                 list.
         """
 
-        def set_room_is_public_appservice_txn(txn, next_id):
-            if is_public:
-                try:
-                    self.db_pool.simple_insert_txn(
-                        txn,
-                        table="appservice_room_list",
-                        values={
-                            "appservice_id": appservice_id,
-                            "network_id": network_id,
-                            "room_id": room_id,
-                        },
-                    )
-                except self.database_engine.module.IntegrityError:
-                    # We've already inserted, nothing to do.
-                    return
-            else:
-                self.db_pool.simple_delete_txn(
-                    txn,
-                    table="appservice_room_list",
-                    keyvalues={
-                        "appservice_id": appservice_id,
-                        "network_id": network_id,
-                        "room_id": room_id,
-                    },
-                )
-
-            entries = self.db_pool.simple_select_list_txn(
-                txn,
-                table="public_room_list_stream",
+        if is_public:
+            await self.db_pool.simple_upsert(
+                table="appservice_room_list",
                 keyvalues={
+                    "appservice_id": appservice_id,
+                    "network_id": network_id,
                     "room_id": room_id,
+                },
+                values={},
+                insertion_values={
                     "appservice_id": appservice_id,
                     "network_id": network_id,
+                    "room_id": room_id,
                 },
-                retcols=("stream_id", "visibility"),
+                desc="set_room_is_public_appservice_true",
             )
-
-            entries.sort(key=lambda r: r["stream_id"])
-
-            add_to_stream = True
-            if entries:
-                add_to_stream = bool(entries[-1]["visibility"]) != is_public
-
-            if add_to_stream:
-                self.db_pool.simple_insert_txn(
-                    txn,
-                    table="public_room_list_stream",
-                    values={
-                        "stream_id": next_id,
-                        "room_id": room_id,
-                        "visibility": is_public,
-                        "appservice_id": appservice_id,
-                        "network_id": network_id,
-                    },
-                )
-
-        async with self._public_room_id_gen.get_next() as next_id:
-            await self.db_pool.runInteraction(
-                "set_room_is_public_appservice",
-                set_room_is_public_appservice_txn,
-                next_id,
+        else:
+            await self.db_pool.simple_delete(
+                table="appservice_room_list",
+                keyvalues={
+                    "appservice_id": appservice_id,
+                    "network_id": network_id,
+                    "room_id": room_id,
+                },
+                desc="set_room_is_public_appservice_false",
             )
+
         self.hs.get_notifier().on_new_replication_data()
 
     async def add_event_report(
@@ -1787,9 +1651,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
             "get_event_reports_paginate", _get_event_reports_paginate_txn
         )
 
-    def get_current_public_room_stream_id(self):
-        return self._public_room_id_gen.get_current_token()
-
     async def block_room(self, room_id: str, user_id: str) -> None:
         """Marks the room as blocked. Can be called multiple times.
 
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 7e0687e197..a5bc0ee8a5 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-SCHEMA_VERSION = 62
+SCHEMA_VERSION = 63
 """Represents the expectations made by the codebase about the database schema
 
 This should be incremented whenever the codebase changes its requirements on the
@@ -25,6 +25,11 @@ for more information on how this works.
 Changes in SCHEMA_VERSION = 61:
     - The `user_stats_historical` and `room_stats_historical` tables are not written and
       are not read (previously, they were written but not read).
+
+Changes in SCHEMA_VERSION = 63:
+    - The `public_room_list_stream` table is not written nor read to
+      (previously, it was written and read to, but not for any significant purpose).
+      https://github.com/matrix-org/synapse/pull/10565
 """