summary refs log tree commit diff
path: root/synapse/storage/room.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/room.py')
-rw-r--r--synapse/storage/room.py149
1 files changed, 134 insertions, 15 deletions
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 8251f58670..11813b44f6 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -48,15 +48,31 @@ class RoomStore(SQLBaseStore):
             StoreError if the room could not be stored.
         """
         try:
-            yield self._simple_insert(
-                "rooms",
-                {
-                    "room_id": room_id,
-                    "creator": room_creator_user_id,
-                    "is_public": is_public,
-                },
-                desc="store_room",
-            )
+            def store_room_txn(txn, next_id):
+                self._simple_insert_txn(
+                    txn,
+                    "rooms",
+                    {
+                        "room_id": room_id,
+                        "creator": room_creator_user_id,
+                        "is_public": is_public,
+                    },
+                )
+                if is_public:
+                    self._simple_insert_txn(
+                        txn,
+                        table="public_room_list_stream",
+                        values={
+                            "stream_id": next_id,
+                            "room_id": room_id,
+                            "visibility": is_public,
+                        }
+                    )
+            with self._public_room_id_gen.get_next() as next_id:
+                yield self.runInteraction(
+                    "store_room_txn",
+                    store_room_txn, next_id,
+                )
         except Exception as e:
             logger.error("store_room with room_id=%s failed: %s", room_id, e)
             raise StoreError(500, "Problem creating room.")
@@ -77,13 +93,45 @@ class RoomStore(SQLBaseStore):
             allow_none=True,
         )
 
+    @defer.inlineCallbacks
     def set_room_is_public(self, room_id, is_public):
-        return self._simple_update_one(
-            table="rooms",
-            keyvalues={"room_id": room_id},
-            updatevalues={"is_public": is_public},
-            desc="set_room_is_public",
-        )
+        def set_room_is_public_txn(txn, next_id):
+            self._simple_update_one_txn(
+                txn,
+                table="rooms",
+                keyvalues={"room_id": room_id},
+                updatevalues={"is_public": is_public},
+            )
+
+            entries = self._simple_select_list_txn(
+                txn,
+                table="public_room_list_stream",
+                keyvalues={"room_id": room_id},
+                retcols=("stream_id", "visibility"),
+            )
+
+            entries.sort(key=lambda r: r["stream_id"])
+
+            add_to_stream = True
+            if entries:
+                add_to_stream = bool(entries[-1]["visibility"]) != is_public
+
+            if add_to_stream:
+                self._simple_insert_txn(
+                    txn,
+                    table="public_room_list_stream",
+                    values={
+                        "stream_id": next_id,
+                        "room_id": room_id,
+                        "visibility": is_public,
+                    }
+                )
+
+        with self._public_room_id_gen.get_next() as next_id:
+            yield self.runInteraction(
+                "set_room_is_public",
+                set_room_is_public_txn, next_id,
+            )
 
     def get_public_room_ids(self):
         return self._simple_select_onecol(
@@ -207,3 +255,74 @@ class RoomStore(SQLBaseStore):
             },
             desc="add_event_report"
         )
+
+    def get_current_public_room_stream_id(self):
+        return self._public_room_id_gen.get_current_token()
+
+    def get_public_room_ids_at_stream_id(self, stream_id):
+        return self.runInteraction(
+            "get_public_room_ids_at_stream_id",
+            self.get_public_room_ids_at_stream_id_txn, stream_id
+        )
+
+    def get_public_room_ids_at_stream_id_txn(self, txn, stream_id):
+        return {
+            rm
+            for rm, vis in self.get_published_at_stream_id_txn(txn, stream_id).items()
+            if vis
+        }
+
+    def get_published_at_stream_id_txn(self, txn, stream_id):
+        sql = ("""
+            SELECT room_id, visibility FROM public_room_list_stream
+            INNER JOIN (
+                SELECT room_id, max(stream_id) AS stream_id
+                FROM public_room_list_stream
+                WHERE stream_id <= ?
+                GROUP BY room_id
+            ) grouped USING (room_id, stream_id)
+        """)
+
+        txn.execute(sql, (stream_id,))
+        return dict(txn.fetchall())
+
+    def get_public_room_changes(self, prev_stream_id, new_stream_id):
+        def get_public_room_changes_txn(txn):
+            then_rooms = self.get_public_room_ids_at_stream_id_txn(txn, prev_stream_id)
+
+            now_rooms_dict = self.get_published_at_stream_id_txn(txn, new_stream_id)
+
+            now_rooms_visible = set(
+                rm for rm, vis in now_rooms_dict.items() if vis
+            )
+            now_rooms_not_visible = set(
+                rm for rm, vis in now_rooms_dict.items() if not vis
+            )
+
+            newly_visible = now_rooms_visible - then_rooms
+            newly_unpublished = now_rooms_not_visible & then_rooms
+
+            return newly_visible, newly_unpublished
+
+        return self.runInteraction(
+            "get_public_room_changes", get_public_room_changes_txn
+        )
+
+    def get_all_new_public_rooms(self, prev_id, current_id, limit):
+        def get_all_new_public_rooms(txn):
+            sql = ("""
+                SELECT stream_id, room_id, visibility FROM public_room_list_stream
+                WHERE stream_id > ? AND stream_id <= ?
+                ORDER BY stream_id ASC
+                LIMIT ?
+            """)
+
+            txn.execute(sql, (prev_id, current_id, limit,))
+            return txn.fetchall()
+
+        if prev_id == current_id:
+            return defer.succeed([])
+
+        return self.runInteraction(
+            "get_all_new_public_rooms", get_all_new_public_rooms
+        )