diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 1309bfd374..78906a5e1d 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1,5 +1,5 @@
# Copyright 2014-2016 OpenMarket Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2019, 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -50,8 +50,14 @@ from synapse.storage.database import (
LoggingTransaction,
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
+from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
-from synapse.storage.util.id_generators import IdGenerator
+from synapse.storage.util.id_generators import (
+ AbstractStreamIdGenerator,
+ IdGenerator,
+ MultiWriterIdGenerator,
+ StreamIdGenerator,
+)
from synapse.types import JsonDict, RetentionPolicy, ThirdPartyInstanceID
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
@@ -114,6 +120,26 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
self.config: HomeServerConfig = hs.config
+ self._un_partial_stated_rooms_stream_id_gen: AbstractStreamIdGenerator
+
+ if isinstance(database.engine, PostgresEngine):
+ self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator(
+ db_conn=db_conn,
+ db=database,
+ stream_name="un_partial_stated_room_stream",
+ instance_name=self._instance_name,
+ tables=[
+ ("un_partial_stated_room_stream", "instance_name", "stream_id")
+ ],
+ sequence_name="un_partial_stated_room_stream_sequence",
+ # TODO(faster_joins, multiple writers) Support multiple writers.
+ writers=["master"],
+ )
+ else:
+ self._un_partial_stated_rooms_stream_id_gen = StreamIdGenerator(
+ db_conn, "un_partial_stated_room_stream", "stream_id"
+ )
+
async def store_room(
self,
room_id: str,
@@ -1216,70 +1242,6 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
return room_servers
- async def clear_partial_state_room(self, room_id: str) -> bool:
- """Clears the partial state flag for a room.
-
- Args:
- room_id: The room whose partial state flag is to be cleared.
-
- Returns:
- `True` if the partial state flag has been cleared successfully.
-
- `False` if the partial state flag could not be cleared because the room
- still contains events with partial state.
- """
- try:
- await self.db_pool.runInteraction(
- "clear_partial_state_room", self._clear_partial_state_room_txn, room_id
- )
- return True
- except self.db_pool.engine.module.IntegrityError as e:
- # Assume that any `IntegrityError`s are due to partial state events.
- logger.info(
- "Exception while clearing lazy partial-state-room %s, retrying: %s",
- room_id,
- e,
- )
- return False
-
- def _clear_partial_state_room_txn(
- self, txn: LoggingTransaction, room_id: str
- ) -> None:
- DatabasePool.simple_delete_txn(
- txn,
- table="partial_state_rooms_servers",
- keyvalues={"room_id": room_id},
- )
- DatabasePool.simple_delete_one_txn(
- txn,
- table="partial_state_rooms",
- keyvalues={"room_id": room_id},
- )
- self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
- self._invalidate_cache_and_stream(
- txn, self.get_partial_state_servers_at_join, (room_id,)
- )
-
- # We now delete anything from `device_lists_remote_pending` with a
- # stream ID less than the minimum
- # `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
- device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
- txn,
- table="partial_state_rooms",
- keyvalues={},
- retcol="MIN(device_lists_stream_id)",
- allow_none=True,
- )
- if device_lists_stream_id is None:
- # There are no rooms being currently partially joined, so we delete everything.
- txn.execute("DELETE FROM device_lists_remote_pending")
- else:
- sql = """
- DELETE FROM device_lists_remote_pending
- WHERE stream_id <= ?
- """
- txn.execute(sql, (device_lists_stream_id,))
-
@cached()
async def is_partial_state_room(self, room_id: str) -> bool:
"""Checks if this room has partial state.
@@ -1315,6 +1277,66 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
)
return result["join_event_id"], result["device_lists_stream_id"]
+ def get_un_partial_stated_rooms_token(self) -> int:
+ # TODO(faster_joins, multiple writers): This is inappropriate if there
+ # are multiple writers because workers that don't write often will
+ # hold all readers up.
+ # (See `MultiWriterIdGenerator.get_persisted_upto_position` for an
+ # explanation.)
+ return self._un_partial_stated_rooms_stream_id_gen.get_current_token()
+
+ async def get_un_partial_stated_rooms_from_stream(
+ self, instance_name: str, last_id: int, current_id: int, limit: int
+ ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
+ """Get updates for caches replication stream.
+
+ Args:
+ instance_name: The writer we want to fetch updates from. Unused
+ here since there is only ever one writer.
+ last_id: The token to fetch updates from. Exclusive.
+ current_id: The token to fetch updates up to. Inclusive.
+ limit: The requested limit for the number of rows to return. The
+ function may return more or fewer rows.
+
+ Returns:
+ A tuple consisting of: the updates, a token to use to fetch
+ subsequent updates, and whether we returned fewer rows than exists
+ between the requested tokens due to the limit.
+
+ The token returned can be used in a subsequent call to this
+ function to get further updatees.
+
+ The updates are a list of 2-tuples of stream ID and the row data
+ """
+
+ if last_id == current_id:
+ return [], current_id, False
+
+ def get_un_partial_stated_rooms_from_stream_txn(
+ txn: LoggingTransaction,
+ ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
+ sql = """
+ SELECT stream_id, room_id
+ FROM un_partial_stated_room_stream
+ WHERE ? < stream_id AND stream_id <= ? AND instance_name = ?
+ ORDER BY stream_id ASC
+ LIMIT ?
+ """
+ txn.execute(sql, (last_id, current_id, instance_name, limit))
+ updates = [(row[0], (row[1],)) for row in txn]
+ limited = False
+ upto_token = current_id
+ if len(updates) >= limit:
+ upto_token = updates[-1][0]
+ limited = True
+
+ return updates, upto_token, limited
+
+ return await self.db_pool.runInteraction(
+ "get_un_partial_stated_rooms_from_stream",
+ get_un_partial_stated_rooms_from_stream_txn,
+ )
+
class _BackgroundUpdates:
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
@@ -1806,6 +1828,8 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
+ self._instance_name = hs.get_instance_name()
+
async def upsert_room_on_join(
self, room_id: str, room_version: RoomVersion, state_events: List[EventBase]
) -> None:
@@ -2270,3 +2294,84 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
self.is_room_blocked,
(room_id,),
)
+
+ async def clear_partial_state_room(self, room_id: str) -> bool:
+ """Clears the partial state flag for a room.
+
+ Args:
+ room_id: The room whose partial state flag is to be cleared.
+
+ Returns:
+ `True` if the partial state flag has been cleared successfully.
+
+ `False` if the partial state flag could not be cleared because the room
+ still contains events with partial state.
+ """
+ try:
+ async with self._un_partial_stated_rooms_stream_id_gen.get_next() as un_partial_state_room_stream_id:
+ await self.db_pool.runInteraction(
+ "clear_partial_state_room",
+ self._clear_partial_state_room_txn,
+ room_id,
+ un_partial_state_room_stream_id,
+ )
+ return True
+ except self.db_pool.engine.module.IntegrityError as e:
+ # Assume that any `IntegrityError`s are due to partial state events.
+ logger.info(
+ "Exception while clearing lazy partial-state-room %s, retrying: %s",
+ room_id,
+ e,
+ )
+ return False
+
+ def _clear_partial_state_room_txn(
+ self,
+ txn: LoggingTransaction,
+ room_id: str,
+ un_partial_state_room_stream_id: int,
+ ) -> None:
+ DatabasePool.simple_delete_txn(
+ txn,
+ table="partial_state_rooms_servers",
+ keyvalues={"room_id": room_id},
+ )
+ DatabasePool.simple_delete_one_txn(
+ txn,
+ table="partial_state_rooms",
+ keyvalues={"room_id": room_id},
+ )
+ self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
+ self._invalidate_cache_and_stream(
+ txn, self.get_partial_state_servers_at_join, (room_id,)
+ )
+
+ DatabasePool.simple_insert_txn(
+ txn,
+ "un_partial_stated_room_stream",
+ {
+ "stream_id": un_partial_state_room_stream_id,
+ "instance_name": self._instance_name,
+ "room_id": room_id,
+ },
+ )
+
+ # We now delete anything from `device_lists_remote_pending` with a
+ # stream ID less than the minimum
+ # `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
+ device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
+ txn,
+ table="partial_state_rooms",
+ keyvalues={},
+ retcol="MIN(device_lists_stream_id)",
+ allow_none=True,
+ )
+ if device_lists_stream_id is None:
+ # There are no rooms being currently partially joined, so we delete everything.
+ txn.execute("DELETE FROM device_lists_remote_pending")
+ else:
+ sql = """
+ DELETE FROM device_lists_remote_pending
+ WHERE stream_id <= ?
+ """
+ txn.execute(sql, (device_lists_stream_id,))
diff --git a/synapse/storage/schema/main/delta/73/20_un_partial_stated_room_stream.sql b/synapse/storage/schema/main/delta/73/20_un_partial_stated_room_stream.sql
new file mode 100644
index 0000000000..743196cfe3
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/20_un_partial_stated_room_stream.sql
@@ -0,0 +1,32 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Stream for notifying that a room has become un-partial-stated.
+CREATE TABLE un_partial_stated_room_stream(
+ -- Position in the stream
+ stream_id BIGINT PRIMARY KEY NOT NULL,
+
+ -- Which instance wrote this entry.
+ instance_name TEXT NOT NULL,
+
+ -- Which room has been un-partial-stated.
+ room_id TEXT NOT NULL REFERENCES rooms(room_id) ON DELETE CASCADE
+);
+
+-- We want an index here because of the foreign key constraint:
+-- upon deleting a room, the database needs to be able to check here.
+-- This index is not unique because we can join a room multiple times in a server's lifetime,
+-- so the same room could be un-partial-stated multiple times!
+CREATE INDEX un_partial_stated_room_stream_room_id ON un_partial_stated_room_stream (room_id);
diff --git a/synapse/storage/schema/main/delta/73/21_un_partial_stated_room_stream_seq.sql.postgres b/synapse/storage/schema/main/delta/73/21_un_partial_stated_room_stream_seq.sql.postgres
new file mode 100644
index 0000000000..c1aac0b385
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/21_un_partial_stated_room_stream_seq.sql.postgres
@@ -0,0 +1,20 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE SEQUENCE IF NOT EXISTS un_partial_stated_room_stream_sequence;
+
+SELECT setval('un_partial_stated_room_stream_sequence', (
+ SELECT COALESCE(MAX(stream_id), 1) FROM un_partial_stated_room_stream
+));
|