diff options
Diffstat (limited to 'synapse/storage/roommember.py')
-rw-r--r-- | synapse/storage/roommember.py | 149 |
1 files changed, 124 insertions, 25 deletions
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 367dbbbcf6..0829ae5bee 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -18,7 +18,9 @@ from twisted.internet import defer from collections import namedtuple from ._base import SQLBaseStore +from synapse.util.caches import intern_string from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from synapse.util.stringutils import to_ascii from synapse.api.constants import Membership, EventTypes from synapse.types import get_domain_from_id @@ -35,6 +37,13 @@ RoomsForUser = namedtuple( ) +# We store this using a namedtuple so that we save about 3x space over using a +# dict. +ProfileInfo = namedtuple( + "ProfileInfo", ("avatar_url", "display_name") +) + + _MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update" @@ -139,7 +148,7 @@ class RoomMemberStore(SQLBaseStore): hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids) defer.returnValue(hosts) - @cached(max_entries=500000, iterable=True) + @cached(max_entries=100000, iterable=True) def get_users_in_room(self, room_id): def f(txn): sql = ( @@ -152,7 +161,7 @@ class RoomMemberStore(SQLBaseStore): ) txn.execute(sql, (room_id, Membership.JOIN,)) - return [r[0] for r in txn] + return [to_ascii(r[0]) for r in txn] return self.runInteraction("get_users_in_room", f) @cached() @@ -378,7 +387,9 @@ class RoomMemberStore(SQLBaseStore): state_group = object() return self._get_joined_users_from_context( - event.room_id, state_group, context.current_state_ids, event=event, + event.room_id, state_group, context.current_state_ids, + event=event, + context=context, ) def get_joined_users_from_state(self, room_id, state_group, state_ids): @@ -396,46 +407,95 @@ class RoomMemberStore(SQLBaseStore): @cachedInlineCallbacks(num_args=2, cache_context=True, iterable=True, max_entries=100000) def _get_joined_users_from_context(self, room_id, state_group, current_state_ids, - cache_context, event=None): + cache_context, event=None, context=None): # We don't use `state_group`, it's there so that we can cache based # on it. However, it's important that it's never None, since two current_states # with a state_group of None are likely to be different. # See bulk_get_push_rules_for_room for how we work around this. assert state_group is not None + users_in_room = {} member_event_ids = [ e_id for key, e_id in current_state_ids.iteritems() if key[0] == EventTypes.Member ] - rows = yield self._simple_select_many_batch( - table="room_memberships", - column="event_id", - iterable=member_event_ids, - retcols=['user_id', 'display_name', 'avatar_url'], - keyvalues={ - "membership": Membership.JOIN, - }, - batch_size=500, - desc="_get_joined_users_from_context", + if context is not None: + # If we have a context with a delta from a previous state group, + # check if we also have the result from the previous group in cache. + # If we do then we can reuse that result and simply update it with + # any membership changes in `delta_ids` + if context.prev_group and context.delta_ids: + prev_res = self._get_joined_users_from_context.cache.get( + (room_id, context.prev_group), None + ) + if prev_res and isinstance(prev_res, dict): + users_in_room = dict(prev_res) + member_event_ids = [ + e_id + for key, e_id in context.delta_ids.iteritems() + if key[0] == EventTypes.Member + ] + for etype, state_key in context.delta_ids: + users_in_room.pop(state_key, None) + + # We check if we have any of the member event ids in the event cache + # before we ask the DB + + # We don't update the event cache hit ratio as it completely throws off + # the hit ratio counts. After all, we don't populate the cache if we + # miss it here + event_map = self._get_events_from_cache( + member_event_ids, + allow_rejected=False, + update_metrics=False, ) - users_in_room = { - row["user_id"]: { - "display_name": row["display_name"], - "avatar_url": row["avatar_url"], - } - for row in rows - } + missing_member_event_ids = [] + for event_id in member_event_ids: + ev_entry = event_map.get(event_id) + if ev_entry: + if ev_entry.event.membership == Membership.JOIN: + users_in_room[to_ascii(ev_entry.event.state_key)] = ProfileInfo( + display_name=to_ascii( + ev_entry.event.content.get("displayname", None) + ), + avatar_url=to_ascii( + ev_entry.event.content.get("avatar_url", None) + ), + ) + else: + missing_member_event_ids.append(event_id) + + if missing_member_event_ids: + rows = yield self._simple_select_many_batch( + table="room_memberships", + column="event_id", + iterable=missing_member_event_ids, + retcols=('user_id', 'display_name', 'avatar_url',), + keyvalues={ + "membership": Membership.JOIN, + }, + batch_size=500, + desc="_get_joined_users_from_context", + ) + + users_in_room.update({ + to_ascii(row["user_id"]): ProfileInfo( + avatar_url=to_ascii(row["avatar_url"]), + display_name=to_ascii(row["display_name"]), + ) + for row in rows + }) if event is not None and event.type == EventTypes.Member: if event.membership == Membership.JOIN: if event.event_id in member_event_ids: - users_in_room[event.state_key] = { - "display_name": event.content.get("displayname", None), - "avatar_url": event.content.get("avatar_url", None), - } + users_in_room[to_ascii(event.state_key)] = ProfileInfo( + display_name=to_ascii(event.content.get("displayname", None)), + avatar_url=to_ascii(event.content.get("avatar_url", None)), + ) defer.returnValue(users_in_room) @@ -474,6 +534,45 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(False) + def get_joined_hosts(self, room_id, state_group, state_ids): + if not state_group: + # If state_group is None it means it has yet to be assigned a + # state group, i.e. we need to make sure that calls with a state_group + # of None don't hit previous cached calls with a None state_group. + # To do this we set the state_group to a new object as object() != object() + state_group = object() + + return self._get_joined_hosts( + room_id, state_group, state_ids + ) + + @cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True) + def _get_joined_hosts(self, room_id, state_group, current_state_ids): + # We don't use `state_group`, its there so that we can cache based + # on it. However, its important that its never None, since two current_state's + # with a state_group of None are likely to be different. + # See bulk_get_push_rules_for_room for how we work around this. + assert state_group is not None + + joined_hosts = set() + for etype, state_key in current_state_ids: + if etype == EventTypes.Member: + try: + host = get_domain_from_id(state_key) + except: + logger.warn("state_key not user_id: %s", state_key) + continue + + if host in joined_hosts: + continue + + event_id = current_state_ids[(etype, state_key)] + event = yield self.get_event(event_id, allow_none=True) + if event and event.content["membership"] == Membership.JOIN: + joined_hosts.add(intern_string(host)) + + defer.returnValue(joined_hosts) + @defer.inlineCallbacks def _background_add_membership_profile(self, progress, batch_size): target_min_stream_id = progress.get( |