diff options
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r-- | synapse/storage/databases/main/__init__.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/filtering.py | 5 | ||||
-rw-r--r-- | synapse/storage/databases/main/profile.py | 12 | ||||
-rw-r--r-- | synapse/storage/databases/main/room.py | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/roommember.py | 122 | ||||
-rw-r--r-- | synapse/storage/databases/main/stats.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/user_directory.py | 13 |
7 files changed, 22 insertions, 140 deletions
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index b6028853c9..be67d1ff22 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -15,7 +15,7 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, List, Optional, Tuple, Union, cast +from typing import TYPE_CHECKING, List, Optional, Tuple, cast from synapse.api.constants import Direction from synapse.config.homeserver import HomeServerConfig @@ -196,7 +196,7 @@ class DataStore( txn: LoggingTransaction, ) -> Tuple[List[JsonDict], int]: filters = [] - args: List[Union[str, int]] = [] + args: list = [] # Set ordering order_by_column = UserSortOrder(order_by).value diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py index 75f7fe8756..fff417f9e3 100644 --- a/synapse/storage/databases/main/filtering.py +++ b/synapse/storage/databases/main/filtering.py @@ -188,13 +188,14 @@ class FilteringWorkerStore(SQLBaseStore): filter_id = max_id + 1 sql = ( - "INSERT INTO user_filters (full_user_id, filter_id, filter_json)" - "VALUES(?, ?, ?)" + "INSERT INTO user_filters (full_user_id, user_id, filter_id, filter_json)" + "VALUES(?, ?, ?, ?)" ) txn.execute( sql, ( user_id.to_string(), + user_id.localpart, filter_id, bytearray(def_json), ), diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py index 660a5507b7..3ba9cc8853 100644 --- a/synapse/storage/databases/main/profile.py +++ b/synapse/storage/databases/main/profile.py @@ -173,9 +173,10 @@ class ProfileWorkerStore(SQLBaseStore): ) async def create_profile(self, user_id: UserID) -> None: + user_localpart = user_id.localpart await self.db_pool.simple_insert( table="profiles", - values={"full_user_id": user_id.to_string()}, + values={"user_id": user_localpart, "full_user_id": user_id.to_string()}, desc="create_profile", ) @@ -190,11 +191,13 @@ class ProfileWorkerStore(SQLBaseStore): new_displayname: The new display name. If this is None, the user's display name is removed. """ + user_localpart = user_id.localpart await self.db_pool.simple_upsert( table="profiles", - keyvalues={"full_user_id": user_id.to_string()}, + keyvalues={"user_id": user_localpart}, values={ "displayname": new_displayname, + "full_user_id": user_id.to_string(), }, desc="set_profile_displayname", ) @@ -210,10 +213,11 @@ class ProfileWorkerStore(SQLBaseStore): new_avatar_url: The new avatar URL. If this is None, the user's avatar is removed. """ + user_localpart = user_id.localpart await self.db_pool.simple_upsert( table="profiles", - keyvalues={"full_user_id": user_id.to_string()}, - values={"avatar_url": new_avatar_url}, + keyvalues={"user_id": user_localpart}, + values={"avatar_url": new_avatar_url, "full_user_id": user_id.to_string()}, desc="set_profile_avatar_url", ) diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index ca8be8c80d..830658f328 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -2136,7 +2136,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): raise StoreError(400, "No create event in state") # Before MSC2175, the room creator was a separate field. - if not room_version.msc2175_implicit_room_creator: + if not room_version.implicit_room_creator: room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR) if not isinstance(room_creator, str): diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 582875c91a..fff259f74c 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from itertools import chain from typing import ( TYPE_CHECKING, AbstractSet, @@ -57,15 +56,12 @@ from synapse.types import ( StrCollection, get_domain_from_id, ) -from synapse.util.async_helpers import Linearizer -from synapse.util.caches import intern_string from synapse.util.caches.descriptors import _CacheContext, cached, cachedList from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure if TYPE_CHECKING: from synapse.server import HomeServer - from synapse.state import _StateCacheEntry logger = logging.getLogger(__name__) @@ -91,10 +87,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): ): super().__init__(database, db_conn, hs) - # Used by `_get_joined_hosts` to ensure only one thing mutates the cache - # at a time. Keyed by room_id. - self._joined_host_linearizer = Linearizer("_JoinedHostsCache") - self._server_notices_mxid = hs.config.servernotices.server_notices_mxid if ( @@ -1057,120 +1049,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): "get_current_hosts_in_room_ordered", get_current_hosts_in_room_ordered_txn ) - async def get_joined_hosts( - self, room_id: str, state: StateMap[str], state_entry: "_StateCacheEntry" - ) -> FrozenSet[str]: - state_group: Union[object, int] = state_entry.state_group - if not state_group: - # If state_group is None it means it has yet to be assigned a - # state group, i.e. we need to make sure that calls with a state_group - # of None don't hit previous cached calls with a None state_group. - # To do this we set the state_group to a new object as object() != object() - state_group = object() - - assert state_group is not None - with Measure(self._clock, "get_joined_hosts"): - return await self._get_joined_hosts( - room_id, state_group, state, state_entry=state_entry - ) - - @cached(num_args=2, max_entries=10000, iterable=True) - async def _get_joined_hosts( - self, - room_id: str, - state_group: Union[object, int], - state: StateMap[str], - state_entry: "_StateCacheEntry", - ) -> FrozenSet[str]: - # We don't use `state_group`, it's there so that we can cache based on - # it. However, its important that its never None, since two - # current_state's with a state_group of None are likely to be different. - # - # The `state_group` must match the `state_entry.state_group` (if not None). - assert state_group is not None - assert state_entry.state_group is None or state_entry.state_group == state_group - - # We use a secondary cache of previous work to allow us to build up the - # joined hosts for the given state group based on previous state groups. - # - # We cache one object per room containing the results of the last state - # group we got joined hosts for. The idea is that generally - # `get_joined_hosts` is called with the "current" state group for the - # room, and so consecutive calls will be for consecutive state groups - # which point to the previous state group. - cache = await self._get_joined_hosts_cache(room_id) - - # If the state group in the cache matches, we already have the data we need. - if state_entry.state_group == cache.state_group: - return frozenset(cache.hosts_to_joined_users) - - # Since we'll mutate the cache we need to lock. - async with self._joined_host_linearizer.queue(room_id): - if state_entry.state_group == cache.state_group: - # Same state group, so nothing to do. We've already checked for - # this above, but the cache may have changed while waiting on - # the lock. - pass - elif state_entry.prev_group == cache.state_group: - # The cached work is for the previous state group, so we work out - # the delta. - assert state_entry.delta_ids is not None - for (typ, state_key), event_id in state_entry.delta_ids.items(): - if typ != EventTypes.Member: - continue - - host = intern_string(get_domain_from_id(state_key)) - user_id = state_key - known_joins = cache.hosts_to_joined_users.setdefault(host, set()) - - event = await self.get_event(event_id) - if event.membership == Membership.JOIN: - known_joins.add(user_id) - else: - known_joins.discard(user_id) - - if not known_joins: - cache.hosts_to_joined_users.pop(host, None) - else: - # The cache doesn't match the state group or prev state group, - # so we calculate the result from first principles. - # - # We need to fetch all hosts joined to the room according to `state` by - # inspecting all join memberships in `state`. However, if the `state` is - # relatively recent then many of its events are likely to be held in - # the current state of the room, which is easily available and likely - # cached. - # - # We therefore compute the set of `state` events not in the - # current state and only fetch those. - current_memberships = ( - await self._get_approximate_current_memberships_in_room(room_id) - ) - unknown_state_events = {} - joined_users_in_current_state = [] - - for (type, state_key), event_id in state.items(): - if event_id not in current_memberships: - unknown_state_events[type, state_key] = event_id - elif current_memberships[event_id] == Membership.JOIN: - joined_users_in_current_state.append(state_key) - - joined_user_ids = await self.get_joined_user_ids_from_state( - room_id, unknown_state_events - ) - - cache.hosts_to_joined_users = {} - for user_id in chain(joined_user_ids, joined_users_in_current_state): - host = intern_string(get_domain_from_id(user_id)) - cache.hosts_to_joined_users.setdefault(host, set()).add(user_id) - - if state_entry.state_group: - cache.state_group = state_entry.state_group - else: - cache.state_group = object() - - return frozenset(cache.hosts_to_joined_users) - async def _get_approximate_current_memberships_in_room( self, room_id: str ) -> Mapping[str, Optional[str]]: diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py index 97c4dc2603..f34b7ce8f4 100644 --- a/synapse/storage/databases/main/stats.py +++ b/synapse/storage/databases/main/stats.py @@ -697,7 +697,7 @@ class StatsStore(StateDeltasStore): txn: LoggingTransaction, ) -> Tuple[List[JsonDict], int]: filters = [] - args = [self.hs.config.server.server_name] + args: list = [] if search_term: filters.append("(lmr.user_id LIKE ? OR displayname LIKE ?)") @@ -733,7 +733,7 @@ class StatsStore(StateDeltasStore): sql_base = """ FROM local_media_repository as lmr - LEFT JOIN profiles AS p ON lmr.user_id = '@' || p.user_id || ':' || ? + LEFT JOIN profiles AS p ON lmr.user_id = p.full_user_id {} GROUP BY lmr.user_id, displayname """.format( diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 924022c95c..2a136f2ff6 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -409,23 +409,22 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): txn, users_to_work_on ) - # Next fetch their profiles. Note that the `user_id` here is the - # *localpart*, and that not all users have profiles. + # Next fetch their profiles. Note that not all users have profiles. profile_rows = self.db_pool.simple_select_many_txn( txn, table="profiles", - column="user_id", - iterable=[get_localpart_from_id(u) for u in users_to_insert], + column="full_user_id", + iterable=list(users_to_insert), retcols=( - "user_id", + "full_user_id", "displayname", "avatar_url", ), keyvalues={}, ) profiles = { - f"@{row['user_id']}:{self.server_name}": _UserDirProfile( - f"@{row['user_id']}:{self.server_name}", + row["full_user_id"]: _UserDirProfile( + row["full_user_id"], row["displayname"], row["avatar_url"], ) |