diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/__init__.py | 2 | ||||
-rw-r--r-- | synapse/handlers/device.py | 46 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 44 | ||||
-rw-r--r-- | synapse/handlers/room.py | 1 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 3 | ||||
-rw-r--r-- | synapse/notifier.py | 1 | ||||
-rw-r--r-- | synapse/replication/slave/storage/events.py | 3 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/keys.py | 2 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 21 | ||||
-rw-r--r-- | synapse/util/caches/descriptors.py | 5 |
10 files changed, 74 insertions, 54 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index 498ded38c0..d3f445be9c 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.18.7" +__version__ = "0.19.0" diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 815410969c..8cb47ac417 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -17,7 +17,8 @@ from synapse.api import errors from synapse.api.constants import EventTypes from synapse.util import stringutils from synapse.util.async import Linearizer -from synapse.types import get_domain_from_id +from synapse.util.metrics import measure_func +from synapse.types import get_domain_from_id, RoomStreamToken from twisted.internet import defer from ._base import BaseHandler @@ -193,25 +194,28 @@ class DeviceHandler(BaseHandler): else: raise + @measure_func("notify_device_update") @defer.inlineCallbacks def notify_device_update(self, user_id, device_ids): """Notify that a user's device(s) has changed. Pokes the notifier, and remote servers if the user is local. """ - rooms = yield self.store.get_rooms_for_user(user_id) - room_ids = [r.room_id for r in rooms] + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) hosts = set() if self.hs.is_mine_id(user_id): - for room_id in room_ids: - users = yield self.store.get_users_in_room(room_id) - hosts.update(get_domain_from_id(u) for u in users) + hosts.update(get_domain_from_id(u) for u in users_who_share_room) hosts.discard(self.server_name) position = yield self.store.add_device_change_to_streams( user_id, device_ids, list(hosts) ) + rooms = yield self.store.get_rooms_for_user(user_id) + room_ids = [r.room_id for r in rooms] + yield self.notifier.on_new_event( "device_list_key", position, rooms=room_ids, ) @@ -221,6 +225,7 @@ class DeviceHandler(BaseHandler): for host in hosts: self.federation_sender.send_device_messages(host) + @measure_func("device.get_user_ids_changed") @defer.inlineCallbacks def get_user_ids_changed(self, user_id, from_token): """Get list of users that have had the devices updated, or have newly @@ -243,15 +248,15 @@ class DeviceHandler(BaseHandler): possibly_changed = set(changed) for room_id in rooms_changed: - # Fetch (an approximation) of the current state at the time. - event_rows, token = yield self.store.get_recent_event_ids_for_room( - room_id, end_token=from_token.room_key, limit=1, - ) + # Fetch the current state at the time. + stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key) - if event_rows: - last_event_id = event_rows[-1]["event_id"] - prev_state_ids = yield self.store.get_state_ids_for_event(last_event_id) - else: + try: + event_ids = yield self.store.get_forward_extremeties_for_room( + room_id, stream_ordering=stream_ordering + ) + prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) + except: prev_state_ids = {} current_state_ids = yield self.state.get_current_state_ids(room_id) @@ -266,14 +271,15 @@ class DeviceHandler(BaseHandler): if not prev_event_id or prev_event_id != event_id: possibly_changed.add(state_key) - user_ids_changed = set() - for other_user_id in possibly_changed: - other_rooms = yield self.store.get_rooms_for_user(other_user_id) - if room_ids.intersection(e.room_id for e in other_rooms): - user_ids_changed.add(other_user_id) + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) - defer.returnValue(user_ids_changed) + # Take the intersection of the users whose devices may have changed + # and those that actually still share a room with the user + defer.returnValue(users_who_share_room & possibly_changed) + @measure_func("_incoming_device_list_update") @defer.inlineCallbacks def _incoming_device_list_update(self, origin, edu_content): user_id = edu_content["user_id"] diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9982ae0fed..fdfce2a88c 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1011,7 +1011,7 @@ class PresenceEventSource(object): @defer.inlineCallbacks @log_function def get_new_events(self, user, from_key, room_ids=None, include_offline=True, - **kwargs): + explicit_room_id=None, **kwargs): # The process for getting presence events are: # 1. Get the rooms the user is in. # 2. Get the list of user in the rooms. @@ -1028,22 +1028,24 @@ class PresenceEventSource(object): user_id = user.to_string() if from_key is not None: from_key = int(from_key) - room_ids = room_ids or [] presence = self.get_presence_handler() stream_change_cache = self.store.presence_stream_cache - if not room_ids: - rooms = yield self.store.get_rooms_for_user(user_id) - room_ids = set(e.room_id for e in rooms) - else: - room_ids = set(room_ids) - max_token = self.store.get_current_presence_token() plist = yield self.store.get_presence_list_accepted(user.localpart) - friends = set(row["observed_user_id"] for row in plist) - friends.add(user_id) # So that we receive our own presence + users_interested_in = set(row["observed_user_id"] for row in plist) + users_interested_in.add(user_id) # So that we receive our own presence + + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) + users_interested_in.update(users_who_share_room) + + if explicit_room_id: + user_ids = yield self.store.get_users_in_room(explicit_room_id) + users_interested_in.update(user_ids) user_ids_changed = set() changed = None @@ -1055,35 +1057,19 @@ class PresenceEventSource(object): # work out if we share a room or they're in our presence list get_updates_counter.inc("stream") for other_user_id in changed: - if other_user_id in friends: + if other_user_id in users_interested_in: user_ids_changed.add(other_user_id) - continue - other_rooms = yield self.store.get_rooms_for_user(other_user_id) - if room_ids.intersection(e.room_id for e in other_rooms): - user_ids_changed.add(other_user_id) - continue else: # Too many possible updates. Find all users we can see and check # if any of them have changed. get_updates_counter.inc("full") - user_ids_to_check = set() - for room_id in room_ids: - users = yield self.store.get_users_in_room(room_id) - user_ids_to_check.update(users) - - user_ids_to_check.update(friends) - - # Always include yourself. Only really matters for when the user is - # not in any rooms, but still. - user_ids_to_check.add(user_id) - if from_key: user_ids_changed = stream_change_cache.get_entities_changed( - user_ids_to_check, from_key, + users_interested_in, from_key, ) else: - user_ids_changed = user_ids_to_check + user_ids_changed = users_interested_in updates = yield presence.current_state_for_users(user_ids_changed) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5f18007e90..7e7671c9a2 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -437,6 +437,7 @@ class RoomEventSource(object): limit, room_ids, is_guest, + explicit_room_id=None, ): # We just ignore the key for now. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9a44de3f33..d7dcd1ce5b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -16,7 +16,7 @@ from synapse.api.constants import Membership, EventTypes from synapse.util.async import concurrently_execute from synapse.util.logcontext import LoggingContext -from synapse.util.metrics import Measure +from synapse.util.metrics import Measure, measure_func from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user from synapse.visibility import filter_events_for_client @@ -561,6 +561,7 @@ class SyncHandler(object): next_batch=sync_result_builder.now_token, )) + @measure_func("_generate_sync_entry_for_device_list") @defer.inlineCallbacks def _generate_sync_entry_for_device_list(self, sync_result_builder): user_id = sync_result_builder.sync_config.user.to_string() diff --git a/synapse/notifier.py b/synapse/notifier.py index acbd4bb5ae..8051a7a842 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -378,6 +378,7 @@ class Notifier(object): limit=limit, is_guest=is_peeking, room_ids=room_ids, + explicit_room_id=explicit_room_id, ) if name == "room": diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 15a025a019..d72ff6055c 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -73,6 +73,9 @@ class SlavedEventStore(BaseSlavedStore): # to reach inside the __dict__ to extract them. get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"] get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"] + get_users_who_share_room_with_user = ( + RoomMemberStore.__dict__["get_users_who_share_room_with_user"] + ) get_latest_event_ids_in_room = EventFederationStore.__dict__[ "get_latest_event_ids_in_room" ] diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index f99b53530a..6a3cfe84f8 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -193,7 +193,7 @@ class KeyChangesServlet(RestServlet): ) defer.returnValue((200, { - "changed": changed + "changed": list(changed), })) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index ee800d074f..545d3d3a99 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -129,7 +129,7 @@ class RoomMemberStore(SQLBaseStore): with self._stream_id_gen.get_next() as stream_ordering: yield self.runInteraction("locally_reject_invite", f, stream_ordering) - @cached(max_entries=100000, iterable=True) + @cached(max_entries=500000, iterable=True) def get_users_in_room(self, room_id): def f(txn): @@ -274,12 +274,29 @@ class RoomMemberStore(SQLBaseStore): return rows - @cached(max_entries=5000) + @cached(max_entries=500000, iterable=True) def get_rooms_for_user(self, user_id): return self.get_rooms_for_user_where_membership_is( user_id, membership_list=[Membership.JOIN], ) + @cachedInlineCallbacks(max_entries=500000, cache_context=True, iterable=True) + def get_users_who_share_room_with_user(self, user_id, cache_context): + """Returns the set of users who share a room with `user_id` + """ + rooms = yield self.get_rooms_for_user( + user_id, on_invalidate=cache_context.invalidate, + ) + + user_who_share_room = set() + for room in rooms: + user_ids = yield self.get_users_in_room( + room.room_id, on_invalidate=cache_context.invalidate, + ) + user_who_share_room.update(user_ids) + + defer.returnValue(user_who_share_room) + def forget(self, user_id, room_id): """Indicate that user_id wishes to discard history for room_id.""" def f(txn): diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 675bfd5feb..998de70d29 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -478,6 +478,11 @@ class CacheListDescriptor(object): class _CacheContext(namedtuple("_CacheContext", ("cache", "key"))): + # We rely on _CacheContext implementing __eq__ and __hash__ sensibly, + # which namedtuple does for us (i.e. two _CacheContext are the same if + # their caches and keys match). This is important in particular to + # dedupe when we add callbacks to lru cache nodes, otherwise the number + # of callbacks would grow. def invalidate(self): self.cache.invalidate(self.key) |