summary refs log tree commit diff
path: root/synapse/storage/databases/main/roommember.py
diff options
context:
space:
mode:
authorBrendan Abolivier <babolivier@matrix.org>2021-08-31 14:53:31 +0100
committerBrendan Abolivier <babolivier@matrix.org>2021-08-31 14:53:31 +0100
commit1d4f5c34d86cc1b2afaf72c4b176469d3004724d (patch)
treebffbc001eac036be46fd50c0a0b67c94b409539e /synapse/storage/databases/main/roommember.py
parentMerge tag 'v1.32.2' into babolivier/dinsic_1.41.0 (diff)
parent 1.33.0 (diff)
downloadsynapse-1d4f5c34d86cc1b2afaf72c4b176469d3004724d.tar.xz
Merge tag 'v1.33.0' into babolivier/dinsic_1.41.0
Synapse 1.33.0 (2021-05-05)
===========================

Features
--------

- Build Debian packages for Ubuntu 21.04 (Hirsute Hippo). ([\#9909](https://github.com/matrix-org/synapse/issues/9909))

Synapse 1.33.0rc2 (2021-04-29)
==============================

Bugfixes
--------

- Fix tight loop when handling presence replication when using workers. Introduced in v1.33.0rc1. ([\#9900](https://github.com/matrix-org/synapse/issues/9900))

Synapse 1.33.0rc1 (2021-04-28)
==============================

Features
--------

- Update experimental support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083): restricting room access via group membership. ([\#9800](https://github.com/matrix-org/synapse/issues/9800), [\#9814](https://github.com/matrix-org/synapse/issues/9814))
- Add experimental support for handling presence on a worker. ([\#9819](https://github.com/matrix-org/synapse/issues/9819), [\#9820](https://github.com/matrix-org/synapse/issues/9820), [\#9828](https://github.com/matrix-org/synapse/issues/9828), [\#9850](https://github.com/matrix-org/synapse/issues/9850))
- Return a new template when an user attempts to renew their account multiple times with the same token, stating that their account is set to expire. This replaces the invalid token template that would previously be shown in this case. This change concerns the optional account validity feature. ([\#9832](https://github.com/matrix-org/synapse/issues/9832))

Bugfixes
--------

- Fixes the OIDC SSO flow when using a `public_baseurl` value including a non-root URL path. ([\#9726](https://github.com/matrix-org/synapse/issues/9726))
- Fix thumbnail generation for some sites with non-standard content types. Contributed by @rkfg. ([\#9788](https://github.com/matrix-org/synapse/issues/9788))
- Add some sanity checks to identity server passed to 3PID bind/unbind endpoints. ([\#9802](https://github.com/matrix-org/synapse/issues/9802))
- Limit the size of HTTP responses read over federation. ([\#9833](https://github.com/matrix-org/synapse/issues/9833))
- Fix a bug which could cause Synapse to get stuck in a loop of resyncing device lists. ([\#9867](https://github.com/matrix-org/synapse/issues/9867))
- Fix a long-standing bug where errors from federation did not propagate to the client. ([\#9868](https://github.com/matrix-org/synapse/issues/9868))

Improved Documentation
----------------------

- Add a note to the docker docs mentioning that we mirror upstream's supported Docker platforms. ([\#9801](https://github.com/matrix-org/synapse/issues/9801))

Internal Changes
----------------

- Add a dockerfile for running Synapse in worker-mode under Complement. ([\#9162](https://github.com/matrix-org/synapse/issues/9162))
- Apply `pyupgrade` across the codebase. ([\#9786](https://github.com/matrix-org/synapse/issues/9786))
- Move some replication processing out of `generic_worker`. ([\#9796](https://github.com/matrix-org/synapse/issues/9796))
- Replace `HomeServer.get_config()` with inline references. ([\#9815](https://github.com/matrix-org/synapse/issues/9815))
- Rename some handlers and config modules to not duplicate the top-level module. ([\#9816](https://github.com/matrix-org/synapse/issues/9816))
- Fix a long-standing bug which caused `max_upload_size` to not be correctly enforced. ([\#9817](https://github.com/matrix-org/synapse/issues/9817))
- Reduce CPU usage of the user directory by reusing existing calculated room membership. ([\#9821](https://github.com/matrix-org/synapse/issues/9821))
- Small speed up for joining large remote rooms. ([\#9825](https://github.com/matrix-org/synapse/issues/9825))
- Introduce flake8-bugbear to the test suite and fix some of its lint violations. ([\#9838](https://github.com/matrix-org/synapse/issues/9838))
- Only store the raw data in the in-memory caches, rather than objects that include references to e.g. the data stores. ([\#9845](https://github.com/matrix-org/synapse/issues/9845))
- Limit length of accepted email addresses. ([\#9855](https://github.com/matrix-org/synapse/issues/9855))
- Remove redundant `synapse.types.Collection` type definition. ([\#9856](https://github.com/matrix-org/synapse/issues/9856))
- Handle recently added rate limits correctly when using `--no-rate-limit` with the demo scripts. ([\#9858](https://github.com/matrix-org/synapse/issues/9858))
- Disable invite rate-limiting by default when running the unit tests. ([\#9871](https://github.com/matrix-org/synapse/issues/9871))
- Pass a reactor into `SynapseSite` to make testing easier. ([\#9874](https://github.com/matrix-org/synapse/issues/9874))
- Make `DomainSpecificString` an `attrs` class. ([\#9875](https://github.com/matrix-org/synapse/issues/9875))
- Add type hints to `synapse.api.auth` and `synapse.api.auth_blocking` modules. ([\#9876](https://github.com/matrix-org/synapse/issues/9876))
- Remove redundant `_PushHTTPChannel` test class. ([\#9878](https://github.com/matrix-org/synapse/issues/9878))
- Remove backwards-compatibility code for Python versions < 3.6. ([\#9879](https://github.com/matrix-org/synapse/issues/9879))
- Small performance improvement around handling new local presence updates. ([\#9887](https://github.com/matrix-org/synapse/issues/9887))
Diffstat (limited to 'synapse/storage/databases/main/roommember.py')
-rw-r--r--synapse/storage/databases/main/roommember.py201
1 files changed, 127 insertions, 74 deletions
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py

index a9216ca9ae..2a8532f8c1 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # @@ -14,7 +13,20 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, List, Optional, Set, Tuple +from typing import ( + TYPE_CHECKING, + Collection, + Dict, + FrozenSet, + Iterable, + List, + Optional, + Set, + Tuple, + Union, +) + +import attr from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase @@ -34,7 +46,7 @@ from synapse.storage.roommember import ( ProfileInfo, RoomsForUser, ) -from synapse.types import Collection, PersistedEventPosition, get_domain_from_id +from synapse.types import PersistedEventPosition, StateMap, get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.caches import intern_string from synapse.util.caches.descriptors import _CacheContext, cached, cachedList @@ -54,6 +66,10 @@ class RoomMemberWorkerStore(EventsWorkerStore): def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) + # Used by `_get_joined_hosts` to ensure only one thing mutates the cache + # at a time. Keyed by room_id. + self._joined_host_linearizer = Linearizer("_JoinedHostsCache") + # Is the current_state_events.membership up to date? Or is the # background update still running? self._current_state_events_membership_up_to_date = False @@ -174,6 +190,33 @@ class RoomMemberWorkerStore(EventsWorkerStore): txn.execute(sql, (room_id, Membership.JOIN)) return [r[0] for r in txn] + @cached(max_entries=100000, iterable=True) + async def get_users_in_room_with_profiles( + self, room_id: str + ) -> Dict[str, ProfileInfo]: + """Get a mapping from user ID to profile information for all users in a given room. + + Args: + room_id: The ID of the room to retrieve the users of. + + Returns: + A mapping from user ID to ProfileInfo. + """ + + def _get_users_in_room_with_profiles(txn) -> Dict[str, ProfileInfo]: + sql = """ + SELECT user_id, display_name, avatar_url FROM room_memberships + WHERE room_id = ? AND membership = ? + """ + txn.execute(sql, (room_id, Membership.JOIN)) + + return {r[0]: ProfileInfo(display_name=r[1], avatar_url=r[2]) for r in txn} + + return await self.db_pool.runInteraction( + "get_users_in_room_with_profiles", + _get_users_in_room_with_profiles, + ) + @cached(max_entries=100000) async def get_room_summary(self, room_id: str) -> Dict[str, MemberSummary]: """Get the details of a room roughly suitable for use by the room @@ -704,19 +747,82 @@ class RoomMemberWorkerStore(EventsWorkerStore): @cached(num_args=2, max_entries=10000, iterable=True) async def _get_joined_hosts( - self, room_id, state_group, current_state_ids, state_entry - ): - # We don't use `state_group`, its there so that we can cache based - # on it. However, its important that its never None, since two current_state's - # with a state_group of None are likely to be different. + self, + room_id: str, + state_group: int, + current_state_ids: StateMap[str], + state_entry: "_StateCacheEntry", + ) -> FrozenSet[str]: + # We don't use `state_group`, its there so that we can cache based on + # it. However, its important that its never None, since two + # current_state's with a state_group of None are likely to be different. + # + # The `state_group` must match the `state_entry.state_group` (if not None). assert state_group is not None - + assert state_entry.state_group is None or state_entry.state_group == state_group + + # We use a secondary cache of previous work to allow us to build up the + # joined hosts for the given state group based on previous state groups. + # + # We cache one object per room containing the results of the last state + # group we got joined hosts for. The idea is that generally + # `get_joined_hosts` is called with the "current" state group for the + # room, and so consecutive calls will be for consecutive state groups + # which point to the previous state group. cache = await self._get_joined_hosts_cache(room_id) - return await cache.get_destinations(state_entry) + + # If the state group in the cache matches, we already have the data we need. + if state_entry.state_group == cache.state_group: + return frozenset(cache.hosts_to_joined_users) + + # Since we'll mutate the cache we need to lock. + with (await self._joined_host_linearizer.queue(room_id)): + if state_entry.state_group == cache.state_group: + # Same state group, so nothing to do. We've already checked for + # this above, but the cache may have changed while waiting on + # the lock. + pass + elif state_entry.prev_group == cache.state_group: + # The cached work is for the previous state group, so we work out + # the delta. + for (typ, state_key), event_id in state_entry.delta_ids.items(): + if typ != EventTypes.Member: + continue + + host = intern_string(get_domain_from_id(state_key)) + user_id = state_key + known_joins = cache.hosts_to_joined_users.setdefault(host, set()) + + event = await self.get_event(event_id) + if event.membership == Membership.JOIN: + known_joins.add(user_id) + else: + known_joins.discard(user_id) + + if not known_joins: + cache.hosts_to_joined_users.pop(host, None) + else: + # The cache doesn't match the state group or prev state group, + # so we calculate the result from first principles. + joined_users = await self.get_joined_users_from_state( + room_id, state_entry + ) + + cache.hosts_to_joined_users = {} + for user_id in joined_users: + host = intern_string(get_domain_from_id(user_id)) + cache.hosts_to_joined_users.setdefault(host, set()).add(user_id) + + if state_entry.state_group: + cache.state_group = state_entry.state_group + else: + cache.state_group = object() + + return frozenset(cache.hosts_to_joined_users) @cached(max_entries=10000) def _get_joined_hosts_cache(self, room_id: str) -> "_JoinedHostsCache": - return _JoinedHostsCache(self, room_id) + return _JoinedHostsCache() @cached(num_args=2) async def did_forget(self, user_id: str, room_id: str) -> bool: @@ -1026,71 +1132,18 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore): await self.db_pool.runInteraction("forget_membership", f) +@attr.s(slots=True) class _JoinedHostsCache: - """Cache for joined hosts in a room that is optimised to handle updates - via state deltas. - """ - - def __init__(self, store, room_id): - self.store = store - self.room_id = room_id - - self.hosts_to_joined_users = {} - - self.state_group = object() + """The cached data used by the `_get_joined_hosts_cache`.""" - self.linearizer = Linearizer("_JoinedHostsCache") + # Dict of host to the set of their users in the room at the state group. + hosts_to_joined_users = attr.ib(type=Dict[str, Set[str]], factory=dict) - self._len = 0 - - async def get_destinations(self, state_entry: "_StateCacheEntry") -> Set[str]: - """Get set of destinations for a state entry - - Args: - state_entry - - Returns: - The destinations as a set. - """ - if state_entry.state_group == self.state_group: - return frozenset(self.hosts_to_joined_users) - - with (await self.linearizer.queue(())): - if state_entry.state_group == self.state_group: - pass - elif state_entry.prev_group == self.state_group: - for (typ, state_key), event_id in state_entry.delta_ids.items(): - if typ != EventTypes.Member: - continue - - host = intern_string(get_domain_from_id(state_key)) - user_id = state_key - known_joins = self.hosts_to_joined_users.setdefault(host, set()) - - event = await self.store.get_event(event_id) - if event.membership == Membership.JOIN: - known_joins.add(user_id) - else: - known_joins.discard(user_id) - - if not known_joins: - self.hosts_to_joined_users.pop(host, None) - else: - joined_users = await self.store.get_joined_users_from_state( - self.room_id, state_entry - ) - - self.hosts_to_joined_users = {} - for user_id in joined_users: - host = intern_string(get_domain_from_id(user_id)) - self.hosts_to_joined_users.setdefault(host, set()).add(user_id) - - if state_entry.state_group: - self.state_group = state_entry.state_group - else: - self.state_group = object() - self._len = sum(len(v) for v in self.hosts_to_joined_users.values()) - return frozenset(self.hosts_to_joined_users) + # The state group `hosts_to_joined_users` is derived from. Will be an object + # if the instance is newly created or if the state is not based on a state + # group. (An object is used as a sentinel value to ensure that it never is + # equal to anything else). + state_group = attr.ib(type=Union[object, int], factory=object) def __len__(self): - return self._len + return sum(len(v) for v in self.hosts_to_joined_users.values())