diff options
Diffstat (limited to 'synapse/storage/databases/main/room.py')
-rw-r--r-- | synapse/storage/databases/main/room.py | 237 |
1 files changed, 171 insertions, 66 deletions
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 1309bfd374..78906a5e1d 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1,5 +1,5 @@ # Copyright 2014-2016 OpenMarket Ltd -# Copyright 2019 The Matrix.org Foundation C.I.C. +# Copyright 2019, 2022 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -50,8 +50,14 @@ from synapse.storage.database import ( LoggingTransaction, ) from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore +from synapse.storage.engines import PostgresEngine from synapse.storage.types import Cursor -from synapse.storage.util.id_generators import IdGenerator +from synapse.storage.util.id_generators import ( + AbstractStreamIdGenerator, + IdGenerator, + MultiWriterIdGenerator, + StreamIdGenerator, +) from synapse.types import JsonDict, RetentionPolicy, ThirdPartyInstanceID from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -114,6 +120,26 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): self.config: HomeServerConfig = hs.config + self._un_partial_stated_rooms_stream_id_gen: AbstractStreamIdGenerator + + if isinstance(database.engine, PostgresEngine): + self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + stream_name="un_partial_stated_room_stream", + instance_name=self._instance_name, + tables=[ + ("un_partial_stated_room_stream", "instance_name", "stream_id") + ], + sequence_name="un_partial_stated_room_stream_sequence", + # TODO(faster_joins, multiple writers) Support multiple writers. + writers=["master"], + ) + else: + self._un_partial_stated_rooms_stream_id_gen = StreamIdGenerator( + db_conn, "un_partial_stated_room_stream", "stream_id" + ) + async def store_room( self, room_id: str, @@ -1216,70 +1242,6 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): return room_servers - async def clear_partial_state_room(self, room_id: str) -> bool: - """Clears the partial state flag for a room. - - Args: - room_id: The room whose partial state flag is to be cleared. - - Returns: - `True` if the partial state flag has been cleared successfully. - - `False` if the partial state flag could not be cleared because the room - still contains events with partial state. - """ - try: - await self.db_pool.runInteraction( - "clear_partial_state_room", self._clear_partial_state_room_txn, room_id - ) - return True - except self.db_pool.engine.module.IntegrityError as e: - # Assume that any `IntegrityError`s are due to partial state events. - logger.info( - "Exception while clearing lazy partial-state-room %s, retrying: %s", - room_id, - e, - ) - return False - - def _clear_partial_state_room_txn( - self, txn: LoggingTransaction, room_id: str - ) -> None: - DatabasePool.simple_delete_txn( - txn, - table="partial_state_rooms_servers", - keyvalues={"room_id": room_id}, - ) - DatabasePool.simple_delete_one_txn( - txn, - table="partial_state_rooms", - keyvalues={"room_id": room_id}, - ) - self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,)) - self._invalidate_cache_and_stream( - txn, self.get_partial_state_servers_at_join, (room_id,) - ) - - # We now delete anything from `device_lists_remote_pending` with a - # stream ID less than the minimum - # `partial_state_rooms.device_lists_stream_id`, as we no longer need them. - device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn( - txn, - table="partial_state_rooms", - keyvalues={}, - retcol="MIN(device_lists_stream_id)", - allow_none=True, - ) - if device_lists_stream_id is None: - # There are no rooms being currently partially joined, so we delete everything. - txn.execute("DELETE FROM device_lists_remote_pending") - else: - sql = """ - DELETE FROM device_lists_remote_pending - WHERE stream_id <= ? - """ - txn.execute(sql, (device_lists_stream_id,)) - @cached() async def is_partial_state_room(self, room_id: str) -> bool: """Checks if this room has partial state. @@ -1315,6 +1277,66 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): ) return result["join_event_id"], result["device_lists_stream_id"] + def get_un_partial_stated_rooms_token(self) -> int: + # TODO(faster_joins, multiple writers): This is inappropriate if there + # are multiple writers because workers that don't write often will + # hold all readers up. + # (See `MultiWriterIdGenerator.get_persisted_upto_position` for an + # explanation.) + return self._un_partial_stated_rooms_stream_id_gen.get_current_token() + + async def get_un_partial_stated_rooms_from_stream( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]: + """Get updates for caches replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ + + if last_id == current_id: + return [], current_id, False + + def get_un_partial_stated_rooms_from_stream_txn( + txn: LoggingTransaction, + ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]: + sql = """ + SELECT stream_id, room_id + FROM un_partial_stated_room_stream + WHERE ? < stream_id AND stream_id <= ? AND instance_name = ? + ORDER BY stream_id ASC + LIMIT ? + """ + txn.execute(sql, (last_id, current_id, instance_name, limit)) + updates = [(row[0], (row[1],)) for row in txn] + limited = False + upto_token = current_id + if len(updates) >= limit: + upto_token = updates[-1][0] + limited = True + + return updates, upto_token, limited + + return await self.db_pool.runInteraction( + "get_un_partial_stated_rooms_from_stream", + get_un_partial_stated_rooms_from_stream_txn, + ) + class _BackgroundUpdates: REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory" @@ -1806,6 +1828,8 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id") + self._instance_name = hs.get_instance_name() + async def upsert_room_on_join( self, room_id: str, room_version: RoomVersion, state_events: List[EventBase] ) -> None: @@ -2270,3 +2294,84 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): self.is_room_blocked, (room_id,), ) + + async def clear_partial_state_room(self, room_id: str) -> bool: + """Clears the partial state flag for a room. + + Args: + room_id: The room whose partial state flag is to be cleared. + + Returns: + `True` if the partial state flag has been cleared successfully. + + `False` if the partial state flag could not be cleared because the room + still contains events with partial state. + """ + try: + async with self._un_partial_stated_rooms_stream_id_gen.get_next() as un_partial_state_room_stream_id: + await self.db_pool.runInteraction( + "clear_partial_state_room", + self._clear_partial_state_room_txn, + room_id, + un_partial_state_room_stream_id, + ) + return True + except self.db_pool.engine.module.IntegrityError as e: + # Assume that any `IntegrityError`s are due to partial state events. + logger.info( + "Exception while clearing lazy partial-state-room %s, retrying: %s", + room_id, + e, + ) + return False + + def _clear_partial_state_room_txn( + self, + txn: LoggingTransaction, + room_id: str, + un_partial_state_room_stream_id: int, + ) -> None: + DatabasePool.simple_delete_txn( + txn, + table="partial_state_rooms_servers", + keyvalues={"room_id": room_id}, + ) + DatabasePool.simple_delete_one_txn( + txn, + table="partial_state_rooms", + keyvalues={"room_id": room_id}, + ) + self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,)) + self._invalidate_cache_and_stream( + txn, self.get_partial_state_servers_at_join, (room_id,) + ) + + DatabasePool.simple_insert_txn( + txn, + "un_partial_stated_room_stream", + { + "stream_id": un_partial_state_room_stream_id, + "instance_name": self._instance_name, + "room_id": room_id, + }, + ) + + # We now delete anything from `device_lists_remote_pending` with a + # stream ID less than the minimum + # `partial_state_rooms.device_lists_stream_id`, as we no longer need them. + device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn( + txn, + table="partial_state_rooms", + keyvalues={}, + retcol="MIN(device_lists_stream_id)", + allow_none=True, + ) + if device_lists_stream_id is None: + # There are no rooms being currently partially joined, so we delete everything. + txn.execute("DELETE FROM device_lists_remote_pending") + else: + sql = """ + DELETE FROM device_lists_remote_pending + WHERE stream_id <= ? + """ + txn.execute(sql, (device_lists_stream_id,)) |