diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/auth.py | 21 | ||||
-rw-r--r-- | synapse/handlers/device.py | 4 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 26 | ||||
-rw-r--r-- | synapse/handlers/message.py | 9 | ||||
-rw-r--r-- | synapse/handlers/room.py | 21 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 10 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 12 | ||||
-rw-r--r-- | synapse/handlers/user_directory.py | 296 |
8 files changed, 331 insertions, 68 deletions
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index e7a1bb7246..b00446bec0 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -21,6 +21,7 @@ from synapse.api.constants import LoginType from synapse.types import UserID from synapse.api.errors import AuthError, LoginError, Codes, StoreError, SynapseError from synapse.util.async import run_on_reactor +from synapse.util.caches.expiringcache import ExpiringCache from twisted.web.client import PartialDownloadError @@ -52,7 +53,15 @@ class AuthHandler(BaseHandler): LoginType.DUMMY: self._check_dummy_auth, } self.bcrypt_rounds = hs.config.bcrypt_rounds - self.sessions = {} + + # This is not a cache per se, but a store of all current sessions that + # expire after N hours + self.sessions = ExpiringCache( + cache_name="register_sessions", + clock=hs.get_clock(), + expiry_ms=self.SESSION_EXPIRE_MS, + reset_expiry_on_get=True, + ) account_handler = _AccountHandler( hs, check_user_exists=self.check_user_exists @@ -617,16 +626,6 @@ class AuthHandler(BaseHandler): logger.debug("Saving session %s", session) session["last_used"] = self.hs.get_clock().time_msec() self.sessions[session["id"]] = session - self._prune_sessions() - - def _prune_sessions(self): - for sid, sess in self.sessions.items(): - last_used = 0 - if 'last_used' in sess: - last_used = sess['last_used'] - now = self.hs.get_clock().time_msec() - if last_used < now - AuthHandler.SESSION_EXPIRE_MS: - del self.sessions[sid] def hash(self, password): """Computes a secure hash of password. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 982cda3edf..ed60d494ff 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -106,7 +106,7 @@ class DeviceHandler(BaseHandler): device_map = yield self.store.get_devices_by_user(user_id) ips = yield self.store.get_last_client_ip_by_device( - devices=((user_id, device_id) for device_id in device_map.keys()) + user_id, device_id=None ) devices = device_map.values() @@ -133,7 +133,7 @@ class DeviceHandler(BaseHandler): except errors.StoreError: raise errors.NotFoundError ips = yield self.store.get_last_client_ip_by_device( - devices=((user_id, device_id),) + user_id, device_id, ) _update_device_from_client_ips(device, ips) defer.returnValue(device) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a333acc4aa..483cb8eac6 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -75,6 +75,7 @@ class FederationHandler(BaseHandler): self.server_name = hs.hostname self.keyring = hs.get_keyring() self.action_generator = hs.get_action_generator() + self.is_mine_id = hs.is_mine_id self.replication_layer.set_handler(self) @@ -1068,6 +1069,24 @@ class FederationHandler(BaseHandler): """ event = pdu + is_blocked = yield self.store.is_room_blocked(event.room_id) + if is_blocked: + raise SynapseError(403, "This room has been blocked on this server") + + membership = event.content.get("membership") + if event.type != EventTypes.Member or membership != Membership.INVITE: + raise SynapseError(400, "The event was not an m.room.member invite event") + + sender_domain = get_domain_from_id(event.sender) + if sender_domain != origin: + raise SynapseError(400, "The invite event was not from the server sending it") + + if event.state_key is None: + raise SynapseError(400, "The invite event did not have a state key") + + if not self.is_mine_id(event.state_key): + raise SynapseError(400, "The invite event must be for this server") + event.internal_metadata.outlier = True event.internal_metadata.invite_from_remote = True @@ -1102,6 +1121,9 @@ class FederationHandler(BaseHandler): user_id, "leave" ) + # Mark as outlier as we don't have any state for this event; we're not + # even in the room. + event.internal_metadata.outlier = True event = self._sign_event(event) # Try the host that we succesfully called /make_leave/ on first for @@ -1273,7 +1295,7 @@ class FederationHandler(BaseHandler): for event in res: # We sign these again because there was a bug where we # incorrectly signed things the first time round - if self.hs.is_mine_id(event.event_id): + if self.is_mine_id(event.event_id): event.signatures.update( compute_event_signature( event, @@ -1346,7 +1368,7 @@ class FederationHandler(BaseHandler): ) if event: - if self.hs.is_mine_id(event.event_id): + if self.is_mine_id(event.event_id): # FIXME: This is a temporary work around where we occasionally # return events slightly differently than when they were # originally signed diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a04f634c5c..24c9ffdb20 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -34,6 +34,7 @@ from canonicaljson import encode_canonical_json import logging import random +import ujson logger = logging.getLogger(__name__) @@ -498,6 +499,14 @@ class MessageHandler(BaseHandler): logger.warn("Denying new event %r because %s", event, err) raise err + # Ensure that we can round trip before trying to persist in db + try: + dump = ujson.dumps(event.content) + ujson.loads(dump) + except: + logger.exception("Failed to encode content: %r", event.content) + raise + yield self.maybe_kick_guest_users(event, context) if event.type == EventTypes.CanonicalAlias: diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index d2a0d6520a..5698d28088 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -61,7 +61,7 @@ class RoomCreationHandler(BaseHandler): } @defer.inlineCallbacks - def create_room(self, requester, config): + def create_room(self, requester, config, ratelimit=True): """ Creates a new room. Args: @@ -75,7 +75,8 @@ class RoomCreationHandler(BaseHandler): """ user_id = requester.user.to_string() - yield self.ratelimit(requester) + if ratelimit: + yield self.ratelimit(requester) if "room_alias_name" in config: for wchar in string.whitespace: @@ -167,6 +168,7 @@ class RoomCreationHandler(BaseHandler): initial_state=initial_state, creation_content=creation_content, room_alias=room_alias, + power_level_content_override=config.get("power_level_content_override", {}) ) if "name" in config: @@ -245,7 +247,8 @@ class RoomCreationHandler(BaseHandler): invite_list, initial_state, creation_content, - room_alias + room_alias, + power_level_content_override, ): def create(etype, content, **kwargs): e = { @@ -291,7 +294,15 @@ class RoomCreationHandler(BaseHandler): ratelimit=False, ) - if (EventTypes.PowerLevels, '') not in initial_state: + # We treat the power levels override specially as this needs to be one + # of the first events that get sent into a room. + pl_content = initial_state.pop((EventTypes.PowerLevels, ''), None) + if pl_content is not None: + yield send( + etype=EventTypes.PowerLevels, + content=pl_content, + ) + else: power_level_content = { "users": { creator_id: 100, @@ -316,6 +327,8 @@ class RoomCreationHandler(BaseHandler): for invitee in invite_list: power_level_content["users"][invitee] = 100 + power_level_content.update(power_level_content_override) + yield send( etype=EventTypes.PowerLevels, content=power_level_content, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 1ca88517a2..b3f979b246 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -203,6 +203,11 @@ class RoomMemberHandler(BaseHandler): if not remote_room_hosts: remote_room_hosts = [] + if effective_membership_state not in ("leave", "ban",): + is_blocked = yield self.store.is_room_blocked(room_id) + if is_blocked: + raise SynapseError(403, "This room has been blocked on this server") + latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) current_state_ids = yield self.state_handler.get_current_state_ids( room_id, latest_event_ids=latest_event_ids, @@ -369,6 +374,11 @@ class RoomMemberHandler(BaseHandler): # so don't really fit into the general auth process. raise AuthError(403, "Guest access not allowed") + if event.membership not in (Membership.LEAVE, Membership.BAN): + is_blocked = yield self.store.is_room_blocked(room_id) + if is_blocked: + raise SynapseError(403, "This room has been blocked on this server") + yield message_handler.handle_new_client_event( requester, event, diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 3b7818af5c..82dedbbc99 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -89,7 +89,7 @@ class TypingHandler(object): until = self._member_typing_until.get(member, None) if not until or until <= now: logger.info("Timing out typing for: %s", member.user_id) - preserve_fn(self._stopped_typing)(member) + self._stopped_typing(member) continue # Check if we need to resend a keep alive over federation for this @@ -147,7 +147,7 @@ class TypingHandler(object): # No point sending another notification defer.returnValue(None) - yield self._push_update( + self._push_update( member=member, typing=True, ) @@ -171,7 +171,7 @@ class TypingHandler(object): member = RoomMember(room_id=room_id, user_id=target_user_id) - yield self._stopped_typing(member) + self._stopped_typing(member) @defer.inlineCallbacks def user_left_room(self, user, room_id): @@ -180,7 +180,6 @@ class TypingHandler(object): member = RoomMember(room_id=room_id, user_id=user_id) yield self._stopped_typing(member) - @defer.inlineCallbacks def _stopped_typing(self, member): if member.user_id not in self._room_typing.get(member.room_id, set()): # No point @@ -189,16 +188,15 @@ class TypingHandler(object): self._member_typing_until.pop(member, None) self._member_last_federation_poke.pop(member, None) - yield self._push_update( + self._push_update( member=member, typing=False, ) - @defer.inlineCallbacks def _push_update(self, member, typing): if self.hs.is_mine_id(member.user_id): # Only send updates for changes to our own users. - yield self._push_remote(member, typing) + preserve_fn(self._push_remote)(member, typing) self._push_update_local( member=member, diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 0182cf86d6..2a49456bfc 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -14,12 +14,12 @@ # limitations under the License. import logging - from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.storage.roommember import ProfileInfo from synapse.util.metrics import Measure +from synapse.util.async import sleep logger = logging.getLogger(__name__) @@ -41,28 +41,41 @@ class UserDirectoyHandler(object): one public room. """ + INITIAL_SLEEP_MS = 50 + INITIAL_SLEEP_COUNT = 100 + INITIAL_BATCH_SIZE = 100 + def __init__(self, hs): self.store = hs.get_datastore() self.state = hs.get_state_handler() self.server_name = hs.hostname self.clock = hs.get_clock() + self.notifier = hs.get_notifier() + self.is_mine_id = hs.is_mine_id + self.update_user_directory = hs.config.update_user_directory # When start up for the first time we need to populate the user_directory. # This is a set of user_id's we've inserted already self.initially_handled_users = set() self.initially_handled_users_in_public = set() + self.initially_handled_users_share = set() + self.initially_handled_users_share_private_room = set() + # The current position in the current_state_delta stream self.pos = None # Guard to ensure we only process deltas one at a time self._is_processing = False - # We kick this off so that we don't have to wait for a change before - # we start populating the user directory - self.clock.call_later(0, self.notify_new_event) + if self.update_user_directory: + self.notifier.add_replication_callback(self.notify_new_event) - def search_users(self, search_term, limit): + # We kick this off so that we don't have to wait for a change before + # we start populating the user directory + self.clock.call_later(0, self.notify_new_event) + + def search_users(self, user_id, search_term, limit): """Searches for users in directory Returns: @@ -79,12 +92,15 @@ class UserDirectoyHandler(object): ] } """ - return self.store.search_user_dir(search_term, limit) + return self.store.search_user_dir(user_id, search_term, limit) @defer.inlineCallbacks def notify_new_event(self): """Called when there may be more deltas to process """ + if not self.update_user_directory: + return + if self._is_processing: return @@ -112,6 +128,7 @@ class UserDirectoyHandler(object): if not deltas: return + logger.info("Handling %d state deltas", len(deltas)) yield self._handle_deltas(deltas) self.pos = deltas[-1]["stream_id"] @@ -130,10 +147,21 @@ class UserDirectoyHandler(object): # We process by going through each existing room at a time. room_ids = yield self.store.get_all_rooms() + logger.info("Doing initial update of user directory. %d rooms", len(room_ids)) + num_processed_rooms = 1 + for room_id in room_ids: + logger.info("Handling room %d/%d", num_processed_rooms, len(room_ids)) yield self._handle_intial_room(room_id) + num_processed_rooms += 1 + yield sleep(self.INITIAL_SLEEP_MS / 1000.) + + logger.info("Processed all rooms.") self.initially_handled_users = None + self.initially_handled_users_in_public = None + self.initially_handled_users_share = None + self.initially_handled_users_share_private_room = None yield self.store.update_user_directory_stream_pos(new_pos) @@ -141,14 +169,15 @@ class UserDirectoyHandler(object): def _handle_intial_room(self, room_id): """Called when we initially fill out user_directory one room at a time """ - is_in_room = yield self.state.get_is_host_in_room(room_id, self.server_name) + is_in_room = yield self.store.is_host_joined(room_id, self.server_name) if not is_in_room: return is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) users_with_profile = yield self.state.get_current_user_in_room(room_id) - unhandled_users = set(users_with_profile) - self.initially_handled_users + user_ids = set(users_with_profile) + unhandled_users = user_ids - self.initially_handled_users yield self.store.add_profiles_to_user_dir( room_id, { @@ -161,9 +190,76 @@ class UserDirectoyHandler(object): if is_public: yield self.store.add_users_to_public_room( room_id, - user_ids=unhandled_users - self.initially_handled_users_in_public + user_ids=user_ids - self.initially_handled_users_in_public ) - self.initially_handled_users_in_public != unhandled_users + self.initially_handled_users_in_public |= user_ids + + # We now go and figure out the new users who share rooms with user entries + # We sleep aggressively here as otherwise it can starve resources. + # We also batch up inserts/updates, but try to avoid too many at once. + to_insert = set() + to_update = set() + count = 0 + for user_id in user_ids: + if count % self.INITIAL_SLEEP_COUNT == 0: + yield sleep(self.INITIAL_SLEEP_MS / 1000.) + + if not self.is_mine_id(user_id): + count += 1 + continue + + if self.store.get_if_app_services_interested_in_user(user_id): + count += 1 + continue + + for other_user_id in user_ids: + if user_id == other_user_id: + continue + + if count % self.INITIAL_SLEEP_COUNT == 0: + yield sleep(self.INITIAL_SLEEP_MS / 1000.) + count += 1 + + user_set = (user_id, other_user_id) + + if user_set in self.initially_handled_users_share_private_room: + continue + + if user_set in self.initially_handled_users_share: + if is_public: + continue + to_update.add(user_set) + else: + to_insert.add(user_set) + + if is_public: + self.initially_handled_users_share.add(user_set) + else: + self.initially_handled_users_share_private_room.add(user_set) + + if len(to_insert) > self.INITIAL_BATCH_SIZE: + yield self.store.add_users_who_share_room( + room_id, not is_public, to_insert, + ) + to_insert.clear() + + if len(to_update) > self.INITIAL_BATCH_SIZE: + yield self.store.update_users_who_share_room( + room_id, not is_public, to_update, + ) + to_update.clear() + + if to_insert: + yield self.store.add_users_who_share_room( + room_id, not is_public, to_insert, + ) + to_insert.clear() + + if to_update: + yield self.store.update_users_who_share_room( + room_id, not is_public, to_update, + ) + to_update.clear() @defer.inlineCallbacks def _handle_deltas(self, deltas): @@ -193,17 +289,19 @@ class UserDirectoyHandler(object): if change is None: # Handle any profile changes - yield self._handle_profile_change(state_key, prev_event_id, event_id) + yield self._handle_profile_change( + state_key, room_id, prev_event_id, event_id, + ) continue if not change: # Need to check if the server left the room entirely, if so # we might need to remove all the users in that room - is_in_room = yield self.state.get_is_host_in_room( + is_in_room = yield self.store.is_host_joined( room_id, self.server_name, ) if not is_in_room: - logger.debug("Server left room: %r", room_id) + logger.info("Server left room: %r", room_id) # Fetch all the users that we marked as being in user # directory due to being in the room and then check if # need to remove those users or not @@ -215,7 +313,7 @@ class UserDirectoyHandler(object): logger.debug("Server is still in room: %r", room_id) if change: # The user joined - event = yield self.store.get_event(event_id) + event = yield self.store.get_event(event_id, allow_none=True) profile = ProfileInfo( avatar_url=event.content.get("avatar_url"), display_name=event.content.get("displayname"), @@ -238,7 +336,7 @@ class UserDirectoyHandler(object): event_id (str|None): The new event after the state change typ (str): Type of the event """ - logger.debug("Handling change for %s", typ) + logger.debug("Handling change for %s: %s", typ, room_id) if typ == EventTypes.RoomHistoryVisibility: change = yield self._get_key_change( @@ -304,12 +402,84 @@ class UserDirectoyHandler(object): room_id ) - if not is_public: - return + if is_public: + row = yield self.store.get_user_in_public_room(user_id) + if not row: + yield self.store.add_users_to_public_room(room_id, [user_id]) + else: + logger.debug("Not adding user to public dir, %r", user_id) - row = yield self.store.get_user_in_public_room(user_id) - if not row: - yield self.store.add_users_to_public_room(room_id, [user_id]) + # Now we update users who share rooms with users. We do this by getting + # all the current users in the room and seeing which aren't already + # marked in the database as sharing with `user_id` + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + + to_insert = set() + to_update = set() + + is_appservice = self.store.get_if_app_services_interested_in_user(user_id) + + # First, if they're our user then we need to update for every user + if self.is_mine_id(user_id) and not is_appservice: + # Returns a map of other_user_id -> shared_private. We only need + # to update mappings if for users that either don't share a room + # already (aren't in the map) or, if the room is private, those that + # only share a public room. + user_ids_shared = yield self.store.get_users_who_share_room_from_dir( + user_id + ) + + for other_user_id in users_with_profile: + if user_id == other_user_id: + continue + + shared_is_private = user_ids_shared.get(other_user_id) + if shared_is_private is True: + # We've already marked in the database they share a private room + continue + elif shared_is_private is False: + # They already share a public room, so only update if this is + # a private room + if not is_public: + to_update.add((user_id, other_user_id)) + elif shared_is_private is None: + # This is the first time they both share a room + to_insert.add((user_id, other_user_id)) + + # Next we need to update for every local user in the room + for other_user_id in users_with_profile: + if user_id == other_user_id: + continue + + is_appservice = self.store.get_if_app_services_interested_in_user( + other_user_id + ) + if self.is_mine_id(other_user_id) and not is_appservice: + shared_is_private = yield self.store.get_if_users_share_a_room( + other_user_id, user_id, + ) + if shared_is_private is True: + # We've already marked in the database they share a private room + continue + elif shared_is_private is False: + # They already share a public room, so only update if this is + # a private room + if not is_public: + to_update.add((other_user_id, user_id)) + elif shared_is_private is None: + # This is the first time they both share a room + to_insert.add((other_user_id, user_id)) + + if to_insert: + yield self.store.add_users_who_share_room( + room_id, not is_public, to_insert, + ) + + if to_update: + yield self.store.update_users_who_share_room( + room_id, not is_public, to_update, + ) @defer.inlineCallbacks def _handle_remove_user(self, room_id, user_id): @@ -327,32 +497,29 @@ class UserDirectoyHandler(object): row = yield self.store.get_user_in_public_room(user_id) update_user_in_public = row and row["room_id"] == room_id - if not update_user_in_public and not update_user_dir: - return - - # XXX: Make this faster? - rooms = yield self.store.get_rooms_for_user(user_id) - for j_room_id in rooms: - if not update_user_in_public and not update_user_dir: - break + if (update_user_in_public or update_user_dir): + # XXX: Make this faster? + rooms = yield self.store.get_rooms_for_user(user_id) + for j_room_id in rooms: + if (not update_user_in_public and not update_user_dir): + break - is_in_room = yield self.state.get_is_host_in_room( - j_room_id, self.server_name, - ) + is_in_room = yield self.store.is_host_joined( + j_room_id, self.server_name, + ) - if not is_in_room: - continue + if not is_in_room: + continue - if update_user_dir: - update_user_dir = False - yield self.store.update_user_in_user_dir(user_id, j_room_id) + if update_user_dir: + update_user_dir = False + yield self.store.update_user_in_user_dir(user_id, j_room_id) - if update_user_in_public: is_public = yield self.store.is_room_world_readable_or_publicly_joinable( j_room_id ) - if is_public: + if update_user_in_public and is_public: yield self.store.update_user_in_public_user_list(user_id, j_room_id) update_user_in_public = False @@ -361,16 +528,59 @@ class UserDirectoyHandler(object): elif update_user_in_public: yield self.store.remove_from_user_in_public_room(user_id) + # Now handle users_who_share_rooms. + + # Get a list of user tuples that were in the DB due to this room and + # users (this includes tuples where the other user matches `user_id`) + user_tuples = yield self.store.get_users_in_share_dir_with_room_id( + user_id, room_id, + ) + + for user_id, other_user_id in user_tuples: + # For each user tuple get a list of rooms that they still share, + # trying to find a private room, and update the entry in the DB + rooms = yield self.store.get_rooms_in_common_for_users(user_id, other_user_id) + + # If they dont share a room anymore, remove the mapping + if not rooms: + yield self.store.remove_user_who_share_room( + user_id, other_user_id, + ) + continue + + found_public_share = None + for j_room_id in rooms: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + j_room_id + ) + + if is_public: + found_public_share = j_room_id + else: + found_public_share = None + yield self.store.update_users_who_share_room( + room_id, not is_public, [(user_id, other_user_id)], + ) + break + + if found_public_share: + yield self.store.update_users_who_share_room( + room_id, not is_public, [(user_id, other_user_id)], + ) + @defer.inlineCallbacks - def _handle_profile_change(self, user_id, prev_event_id, event_id): + def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id): """Check member event changes for any profile changes and update the database if there are. """ if not prev_event_id or not event_id: return - prev_event = yield self.store.get_event(prev_event_id) - event = yield self.store.get_event(event_id) + prev_event = yield self.store.get_event(prev_event_id, allow_none=True) + event = yield self.store.get_event(event_id, allow_none=True) + + if not prev_event or not event: + return if event.membership != Membership.JOIN: return @@ -382,7 +592,9 @@ class UserDirectoyHandler(object): new_avatar = event.content.get("avatar_url") if prev_name != new_name or prev_avatar != new_avatar: - yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar) + yield self.store.update_profile_in_user_dir( + user_id, new_name, new_avatar, room_id, + ) @defer.inlineCallbacks def _get_key_change(self, prev_event_id, event_id, key_name, public_value): |