diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/federation.py | 28 | ||||
-rw-r--r-- | synapse/handlers/message.py | 6 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 11 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 12 | ||||
-rw-r--r-- | synapse/handlers/user_directory.py | 446 |
5 files changed, 484 insertions, 19 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 52d97dfbf3..39d2bee8da 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -43,7 +43,6 @@ from synapse.events.utils import prune_event from synapse.util.retryutils import NotRetryingDestination -from synapse.push.action_generator import ActionGenerator from synapse.util.distributor import user_joined_room from twisted.internet import defer @@ -75,6 +74,7 @@ class FederationHandler(BaseHandler): self.state_handler = hs.get_state_handler() self.server_name = hs.hostname self.keyring = hs.get_keyring() + self.action_generator = hs.get_action_generator() self.replication_layer.set_handler(self) @@ -832,7 +832,11 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def on_event_auth(self, event_id): - auth = yield self.store.get_auth_chain([event_id]) + event = yield self.store.get_event(event_id) + auth = yield self.store.get_auth_chain( + [auth_id for auth_id, _ in event.auth_events], + include_given=True + ) for event in auth: event.signatures.update( @@ -1047,9 +1051,7 @@ class FederationHandler(BaseHandler): yield user_joined_room(self.distributor, user, event.room_id) state_ids = context.prev_state_ids.values() - auth_chain = yield self.store.get_auth_chain(set( - [event.event_id] + state_ids - )) + auth_chain = yield self.store.get_auth_chain(state_ids) state = yield self.store.get_events(context.prev_state_ids.values()) @@ -1100,6 +1102,9 @@ class FederationHandler(BaseHandler): user_id, "leave" ) + # Mark as outlier as we don't have any state for this event; we're not + # even in the room. + event.internal_metadata.outlier = True event = self._sign_event(event) # Try the host that we succesfully called /make_leave/ on first for @@ -1389,8 +1394,7 @@ class FederationHandler(BaseHandler): ) if not event.internal_metadata.is_outlier(): - action_generator = ActionGenerator(self.hs) - yield action_generator.handle_push_actions_for_event( + yield self.action_generator.handle_push_actions_for_event( event, context ) @@ -1599,7 +1603,11 @@ class FederationHandler(BaseHandler): pass # Now get the current auth_chain for the event. - local_auth_chain = yield self.store.get_auth_chain([event_id]) + event = yield self.store.get_event(event_id) + local_auth_chain = yield self.store.get_auth_chain( + [auth_id for auth_id, _ in event.auth_events], + include_given=True + ) # TODO: Check if we would now reject event_id. If so we need to tell # everyone. @@ -1792,7 +1800,9 @@ class FederationHandler(BaseHandler): auth_ids = yield self.auth.compute_auth_events( event, context.prev_state_ids ) - local_auth_chain = yield self.store.get_auth_chain(auth_ids) + local_auth_chain = yield self.store.get_auth_chain( + auth_ids, include_given=True + ) try: # 2. Get remote difference. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 196925edad..a04f634c5c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -20,7 +20,6 @@ from synapse.api.errors import AuthError, Codes, SynapseError from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator -from synapse.push.action_generator import ActionGenerator from synapse.types import ( UserID, RoomAlias, RoomStreamToken, ) @@ -54,6 +53,8 @@ class MessageHandler(BaseHandler): # This is to stop us from diverging history *too* much. self.limiter = Limiter(max_count=5) + self.action_generator = hs.get_action_generator() + @defer.inlineCallbacks def purge_history(self, room_id, event_id): event = yield self.store.get_event(event_id) @@ -590,8 +591,7 @@ class MessageHandler(BaseHandler): "Changing the room create event is forbidden", ) - action_generator = ActionGenerator(self.hs) - yield action_generator.handle_push_actions_for_event( + yield self.action_generator.handle_push_actions_for_event( event, context ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c0205da1a9..91c6c6be3c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -117,6 +117,8 @@ class SyncResult(collections.namedtuple("SyncResult", [ "archived", # ArchivedSyncResult for each archived room. "to_device", # List of direct messages for the device. "device_lists", # List of user_ids whose devices have chanegd + "device_one_time_keys_count", # Dict of algorithm to count for one time keys + # for this device ])): __slots__ = [] @@ -550,6 +552,14 @@ class SyncHandler(object): sync_result_builder ) + device_id = sync_config.device_id + one_time_key_counts = {} + if device_id: + user_id = sync_config.user.to_string() + one_time_key_counts = yield self.store.count_e2e_one_time_keys( + user_id, device_id + ) + defer.returnValue(SyncResult( presence=sync_result_builder.presence, account_data=sync_result_builder.account_data, @@ -558,6 +568,7 @@ class SyncHandler(object): archived=sync_result_builder.archived, to_device=sync_result_builder.to_device, device_lists=device_lists, + device_one_time_keys_count=one_time_key_counts, next_batch=sync_result_builder.now_token, )) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 3b7818af5c..82dedbbc99 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -89,7 +89,7 @@ class TypingHandler(object): until = self._member_typing_until.get(member, None) if not until or until <= now: logger.info("Timing out typing for: %s", member.user_id) - preserve_fn(self._stopped_typing)(member) + self._stopped_typing(member) continue # Check if we need to resend a keep alive over federation for this @@ -147,7 +147,7 @@ class TypingHandler(object): # No point sending another notification defer.returnValue(None) - yield self._push_update( + self._push_update( member=member, typing=True, ) @@ -171,7 +171,7 @@ class TypingHandler(object): member = RoomMember(room_id=room_id, user_id=target_user_id) - yield self._stopped_typing(member) + self._stopped_typing(member) @defer.inlineCallbacks def user_left_room(self, user, room_id): @@ -180,7 +180,6 @@ class TypingHandler(object): member = RoomMember(room_id=room_id, user_id=user_id) yield self._stopped_typing(member) - @defer.inlineCallbacks def _stopped_typing(self, member): if member.user_id not in self._room_typing.get(member.room_id, set()): # No point @@ -189,16 +188,15 @@ class TypingHandler(object): self._member_typing_until.pop(member, None) self._member_last_federation_poke.pop(member, None) - yield self._push_update( + self._push_update( member=member, typing=False, ) - @defer.inlineCallbacks def _push_update(self, member, typing): if self.hs.is_mine_id(member.user_id): # Only send updates for changes to our own users. - yield self._push_remote(member, typing) + preserve_fn(self._push_remote)(member, typing) self._push_update_local( member=member, diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py new file mode 100644 index 0000000000..f4451e5dfb --- /dev/null +++ b/synapse/handlers/user_directory.py @@ -0,0 +1,446 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.api.constants import EventTypes, JoinRules, Membership +from synapse.storage.roommember import ProfileInfo +from synapse.util.metrics import Measure + + +logger = logging.getLogger(__name__) + + +class UserDirectoyHandler(object): + """Handles querying of and keeping updated the user_directory. + + N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY + + The user directory is filled with users who this server can see are joined to a + world_readable or publically joinable room. We keep a database table up to date + by streaming changes of the current state and recalculating whether users should + be in the directory or not when necessary. + + For each user in the directory we also store a room_id which is public and that the + user is joined to. This allows us to ignore history_visibility and join_rules changes + for that user in all other public rooms, as we know they'll still be in at least + one public room. + """ + + def __init__(self, hs): + self.store = hs.get_datastore() + self.state = hs.get_state_handler() + self.server_name = hs.hostname + self.clock = hs.get_clock() + self.notifier = hs.get_notifier() + + self.notifier.add_replication_callback(self.notify_new_event) + + # When start up for the first time we need to populate the user_directory. + # This is a set of user_id's we've inserted already + self.initially_handled_users = set() + self.initially_handled_users_in_public = set() + + # The current position in the current_state_delta stream + self.pos = None + + # Guard to ensure we only process deltas one at a time + self._is_processing = False + + # We kick this off so that we don't have to wait for a change before + # we start populating the user directory + self.clock.call_later(0, self.notify_new_event) + + def search_users(self, search_term, limit): + """Searches for users in directory + + Returns: + dict of the form:: + + { + "limited": <bool>, # whether there were more results or not + "results": [ # Ordered by best match first + { + "user_id": <user_id>, + "display_name": <display_name>, + "avatar_url": <avatar_url> + } + ] + } + """ + return self.store.search_user_dir(search_term, limit) + + @defer.inlineCallbacks + def notify_new_event(self): + """Called when there may be more deltas to process + """ + if self._is_processing: + return + + self._is_processing = True + try: + yield self._unsafe_process() + finally: + self._is_processing = False + + @defer.inlineCallbacks + def _unsafe_process(self): + # If self.pos is None then means we haven't fetched it from DB + if self.pos is None: + self.pos = yield self.store.get_user_directory_stream_pos() + + # If still None then we need to do the initial fill of directory + if self.pos is None: + yield self._do_initial_spam() + self.pos = yield self.store.get_user_directory_stream_pos() + + # Loop round handling deltas until we're up to date + while True: + with Measure(self.clock, "user_dir_delta"): + deltas = yield self.store.get_current_state_deltas(self.pos) + if not deltas: + return + + yield self._handle_deltas(deltas) + + self.pos = deltas[-1]["stream_id"] + yield self.store.update_user_directory_stream_pos(self.pos) + + @defer.inlineCallbacks + def _do_initial_spam(self): + """Populates the user_directory from the current state of the DB, used + when synapse first starts with user_directory support + """ + new_pos = yield self.store.get_max_stream_id_in_current_state_deltas() + + # Delete any existing entries just in case there are any + yield self.store.delete_all_from_user_dir() + + # We process by going through each existing room at a time. + room_ids = yield self.store.get_all_rooms() + + logger.info("Doing initial update of user directory. %d rooms", len(room_ids)) + num_processed_rooms = 1 + + for room_id in room_ids: + logger.info("Handling room %d/%d", num_processed_rooms, len(room_ids)) + yield self._handle_intial_room(room_id) + num_processed_rooms += 1 + + logger.info("Processed all rooms.") + + self.initially_handled_users = None + + yield self.store.update_user_directory_stream_pos(new_pos) + + @defer.inlineCallbacks + def _handle_intial_room(self, room_id): + """Called when we initially fill out user_directory one room at a time + """ + is_in_room = yield self.store.is_host_joined(room_id, self.server_name) + if not is_in_room: + return + + is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + unhandled_users = set(users_with_profile) - self.initially_handled_users + + yield self.store.add_profiles_to_user_dir( + room_id, { + user_id: users_with_profile[user_id] for user_id in unhandled_users + } + ) + + self.initially_handled_users |= unhandled_users + + if is_public: + yield self.store.add_users_to_public_room( + room_id, + user_ids=unhandled_users - self.initially_handled_users_in_public + ) + self.initially_handled_users_in_public != unhandled_users + + @defer.inlineCallbacks + def _handle_deltas(self, deltas): + """Called with the state deltas to process + """ + for delta in deltas: + typ = delta["type"] + state_key = delta["state_key"] + room_id = delta["room_id"] + event_id = delta["event_id"] + prev_event_id = delta["prev_event_id"] + + logger.debug("Handling: %r %r, %s", typ, state_key, event_id) + + # For join rule and visibility changes we need to check if the room + # may have become public or not and add/remove the users in said room + if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules): + yield self._handle_room_publicity_change( + room_id, prev_event_id, event_id, typ, + ) + elif typ == EventTypes.Member: + change = yield self._get_key_change( + prev_event_id, event_id, + key_name="membership", + public_value=Membership.JOIN, + ) + + if change is None: + # Handle any profile changes + yield self._handle_profile_change( + state_key, room_id, prev_event_id, event_id, + ) + continue + + if not change: + # Need to check if the server left the room entirely, if so + # we might need to remove all the users in that room + is_in_room = yield self.store.is_host_joined( + room_id, self.server_name, + ) + if not is_in_room: + logger.debug("Server left room: %r", room_id) + # Fetch all the users that we marked as being in user + # directory due to being in the room and then check if + # need to remove those users or not + user_ids = yield self.store.get_users_in_dir_due_to_room(room_id) + for user_id in user_ids: + yield self._handle_remove_user(room_id, user_id) + return + else: + logger.debug("Server is still in room: %r", room_id) + + if change: # The user joined + event = yield self.store.get_event(event_id, allow_none=True) + profile = ProfileInfo( + avatar_url=event.content.get("avatar_url"), + display_name=event.content.get("displayname"), + ) + + yield self._handle_new_user(room_id, state_key, profile) + else: # The user left + yield self._handle_remove_user(room_id, state_key) + else: + logger.debug("Ignoring irrelevant type: %r", typ) + + @defer.inlineCallbacks + def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ): + """Handle a room having potentially changed from/to world_readable/publically + joinable. + + Args: + room_id (str) + prev_event_id (str|None): The previous event before the state change + event_id (str|None): The new event after the state change + typ (str): Type of the event + """ + logger.debug("Handling change for %s", typ) + + if typ == EventTypes.RoomHistoryVisibility: + change = yield self._get_key_change( + prev_event_id, event_id, + key_name="history_visibility", + public_value="world_readable", + ) + elif typ == EventTypes.JoinRules: + change = yield self._get_key_change( + prev_event_id, event_id, + key_name="join_rule", + public_value=JoinRules.PUBLIC, + ) + else: + raise Exception("Invalid event type") + # If change is None, no change. True => become world_readable/public, + # False => was world_readable/public + if change is None: + logger.debug("No change") + return + + # There's been a change to or from being world readable. + + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + + logger.debug("Change: %r, is_public: %r", change, is_public) + + if change and not is_public: + # If we became world readable but room isn't currently public then + # we ignore the change + return + elif not change and is_public: + # If we stopped being world readable but are still public, + # ignore the change + return + + if change: + users_with_profile = yield self.state.get_current_user_in_room(room_id) + for user_id, profile in users_with_profile.iteritems(): + yield self._handle_new_user(room_id, user_id, profile) + else: + users = yield self.store.get_users_in_public_due_to_room(room_id) + for user_id in users: + yield self._handle_remove_user(room_id, user_id) + + @defer.inlineCallbacks + def _handle_new_user(self, room_id, user_id, profile): + """Called when we might need to add user to directory + + Args: + room_id (str): room_id that user joined or started being public that + user_id (str) + """ + logger.debug("Adding user to dir, %r", user_id) + + row = yield self.store.get_user_in_directory(user_id) + if not row: + yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + + if not is_public: + return + + row = yield self.store.get_user_in_public_room(user_id) + if not row: + yield self.store.add_users_to_public_room(room_id, [user_id]) + + @defer.inlineCallbacks + def _handle_remove_user(self, room_id, user_id): + """Called when we might need to remove user to directory + + Args: + room_id (str): room_id that user left or stopped being public that + user_id (str) + """ + logger.debug("Maybe removing user %r", user_id) + + row = yield self.store.get_user_in_directory(user_id) + update_user_dir = row and row["room_id"] == room_id + + row = yield self.store.get_user_in_public_room(user_id) + update_user_in_public = row and row["room_id"] == room_id + + if not update_user_in_public and not update_user_dir: + return + + # XXX: Make this faster? + rooms = yield self.store.get_rooms_for_user(user_id) + for j_room_id in rooms: + if not update_user_in_public and not update_user_dir: + break + + is_in_room = yield self.store.is_host_joined( + j_room_id, self.server_name, + ) + + if not is_in_room: + continue + + if update_user_dir: + update_user_dir = False + yield self.store.update_user_in_user_dir(user_id, j_room_id) + + if update_user_in_public: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + j_room_id + ) + + if is_public: + yield self.store.update_user_in_public_user_list(user_id, j_room_id) + update_user_in_public = False + + if update_user_dir: + yield self.store.remove_from_user_dir(user_id) + elif update_user_in_public: + yield self.store.remove_from_user_in_public_room(user_id) + + @defer.inlineCallbacks + def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id): + """Check member event changes for any profile changes and update the + database if there are. + """ + if not prev_event_id or not event_id: + return + + prev_event = yield self.store.get_event(prev_event_id, allow_none=True) + event = yield self.store.get_event(event_id, allow_none=True) + + if not prev_event or not event: + return + + if event.membership != Membership.JOIN: + return + + prev_name = prev_event.content.get("displayname") + new_name = event.content.get("displayname") + + prev_avatar = prev_event.content.get("avatar_url") + new_avatar = event.content.get("avatar_url") + + if prev_name != new_name or prev_avatar != new_avatar: + yield self.store.update_profile_in_user_dir( + user_id, new_name, new_avatar, room_id, + ) + + @defer.inlineCallbacks + def _get_key_change(self, prev_event_id, event_id, key_name, public_value): + """Given two events check if the `key_name` field in content changed + from not matching `public_value` to doing so. + + For example, check if `history_visibility` (`key_name`) changed from + `shared` to `world_readable` (`public_value`). + + Returns: + None if the field in the events either both match `public_value` + or if neither do, i.e. there has been no change. + True if it didnt match `public_value` but now does + False if it did match `public_value` but now doesn't + """ + prev_event = None + event = None + if prev_event_id: + prev_event = yield self.store.get_event(prev_event_id, allow_none=True) + + if event_id: + event = yield self.store.get_event(event_id, allow_none=True) + + if not event and not prev_event: + logger.debug("Neither event exists: %r %r", prev_event_id, event_id) + defer.returnValue(None) + + prev_value = None + value = None + + if prev_event: + prev_value = prev_event.content.get(key_name) + + if event: + value = event.content.get(key_name) + + logger.debug("prev_value: %r -> value: %r", prev_value, value) + + if value == public_value and prev_value != public_value: + defer.returnValue(True) + elif value != public_value and prev_value == public_value: + defer.returnValue(False) + else: + defer.returnValue(None) |