diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/_base.py | 5 | ||||
-rw-r--r-- | synapse/handlers/auth.py | 99 | ||||
-rw-r--r-- | synapse/handlers/deactivate_account.py | 1 | ||||
-rw-r--r-- | synapse/handlers/device.py | 392 | ||||
-rw-r--r-- | synapse/handlers/directory.py | 8 | ||||
-rw-r--r-- | synapse/handlers/events.py | 7 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 140 | ||||
-rw-r--r-- | synapse/handlers/initial_sync.py | 6 | ||||
-rw-r--r-- | synapse/handlers/message.py | 27 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 6 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 10 | ||||
-rw-r--r-- | synapse/handlers/receipts.py | 114 | ||||
-rw-r--r-- | synapse/handlers/register.py | 50 | ||||
-rw-r--r-- | synapse/handlers/room_list.py | 14 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 10 | ||||
-rw-r--r-- | synapse/handlers/state_deltas.py | 70 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 86 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 2 | ||||
-rw-r--r-- | synapse/handlers/user_directory.py | 423 |
19 files changed, 804 insertions, 666 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 594754cfd8..ac09d03ba9 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -93,9 +93,9 @@ class BaseHandler(object): messages_per_second = self.hs.config.rc_messages_per_second burst_count = self.hs.config.rc_message_burst_count - allowed, time_allowed = self.ratelimiter.send_message( + allowed, time_allowed = self.ratelimiter.can_do_action( user_id, time_now, - msg_rate_hz=messages_per_second, + rate_hz=messages_per_second, burst_count=burst_count, update=update, ) @@ -165,6 +165,7 @@ class BaseHandler(object): member_event.room_id, "leave", ratelimit=False, + require_consent=False, ) except Exception as e: logger.exception("Error kicking guest user: %s" % (e,)) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 2abd9af94f..4544de821d 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -35,6 +35,7 @@ from synapse.api.errors import ( StoreError, SynapseError, ) +from synapse.api.ratelimiting import Ratelimiter from synapse.module_api import ModuleApi from synapse.types import UserID from synapse.util import logcontext @@ -99,6 +100,11 @@ class AuthHandler(BaseHandler): login_types.append(t) self._supported_login_types = login_types + self._account_ratelimiter = Ratelimiter() + self._failed_attempts_ratelimiter = Ratelimiter() + + self._clock = self.hs.get_clock() + @defer.inlineCallbacks def validate_user_via_ui_auth(self, requester, request_body, clientip): """ @@ -568,7 +574,12 @@ class AuthHandler(BaseHandler): Returns: defer.Deferred: (unicode) canonical_user_id, or None if zero or multiple matches + + Raises: + LimitExceededError if the ratelimiter's login requests count for this + user is too high too proceed. """ + self.ratelimit_login_per_account(user_id) res = yield self._find_user_id_and_pwd_hash(user_id) if res is not None: defer.returnValue(res[0]) @@ -634,6 +645,8 @@ class AuthHandler(BaseHandler): StoreError if there was a problem accessing the database SynapseError if there was a problem with the request LoginError if there was an authentication problem. + LimitExceededError if the ratelimiter's login requests count for this + user is too high too proceed. """ if username.startswith('@'): @@ -643,6 +656,8 @@ class AuthHandler(BaseHandler): username, self.hs.hostname ).to_string() + self.ratelimit_login_per_account(qualified_user_id) + login_type = login_submission.get("type") known_login_type = False @@ -715,15 +730,58 @@ class AuthHandler(BaseHandler): if not known_login_type: raise SynapseError(400, "Unknown login type %s" % login_type) - # unknown username or invalid password. We raise a 403 here, but note - # that if we're doing user-interactive login, it turns all LoginErrors - # into a 401 anyway. + # unknown username or invalid password. + self._failed_attempts_ratelimiter.ratelimit( + qualified_user_id.lower(), time_now_s=self._clock.time(), + rate_hz=self.hs.config.rc_login_failed_attempts.per_second, + burst_count=self.hs.config.rc_login_failed_attempts.burst_count, + update=True, + ) + + # We raise a 403 here, but note that if we're doing user-interactive + # login, it turns all LoginErrors into a 401 anyway. raise LoginError( 403, "Invalid password", errcode=Codes.FORBIDDEN ) @defer.inlineCallbacks + def check_password_provider_3pid(self, medium, address, password): + """Check if a password provider is able to validate a thirdparty login + + Args: + medium (str): The medium of the 3pid (ex. email). + address (str): The address of the 3pid (ex. jdoe@example.com). + password (str): The password of the user. + + Returns: + Deferred[(str|None, func|None)]: A tuple of `(user_id, + callback)`. If authentication is successful, `user_id` is a `str` + containing the authenticated, canonical user ID. `callback` is + then either a function to be later run after the server has + completed login/registration, or `None`. If authentication was + unsuccessful, `user_id` and `callback` are both `None`. + """ + for provider in self.password_providers: + if hasattr(provider, "check_3pid_auth"): + # This function is able to return a deferred that either + # resolves None, meaning authentication failure, or upon + # success, to a str (which is the user_id) or a tuple of + # (user_id, callback_func), where callback_func should be run + # after we've finished everything else + result = yield provider.check_3pid_auth( + medium, address, password, + ) + if result: + # Check if the return value is a str or a tuple + if isinstance(result, str): + # If it's a str, set callback function to None + result = (result, None) + defer.returnValue(result) + + defer.returnValue((None, None)) + + @defer.inlineCallbacks def _check_local_password(self, user_id, password): """Authenticate a user against the local password database. @@ -734,7 +792,12 @@ class AuthHandler(BaseHandler): user_id (unicode): complete @user:id password (unicode): the provided password Returns: - (unicode) the canonical_user_id, or None if unknown user / bad password + Deferred[unicode] the canonical_user_id, or Deferred[None] if + unknown user/bad password + + Raises: + LimitExceededError if the ratelimiter's login requests count for this + user is too high too proceed. """ lookupres = yield self._find_user_id_and_pwd_hash(user_id) if not lookupres: @@ -763,6 +826,7 @@ class AuthHandler(BaseHandler): auth_api.validate_macaroon(macaroon, "login", True, user_id) except Exception: raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN) + self.ratelimit_login_per_account(user_id) yield self.auth.check_auth_blocking(user_id) defer.returnValue(user_id) @@ -934,6 +998,33 @@ class AuthHandler(BaseHandler): else: return defer.succeed(False) + def ratelimit_login_per_account(self, user_id): + """Checks whether the process must be stopped because of ratelimiting. + + Checks against two ratelimiters: the generic one for login attempts per + account and the one specific to failed attempts. + + Args: + user_id (unicode): complete @user:id + + Raises: + LimitExceededError if one of the ratelimiters' login requests count + for this user is too high too proceed. + """ + self._failed_attempts_ratelimiter.ratelimit( + user_id.lower(), time_now_s=self._clock.time(), + rate_hz=self.hs.config.rc_login_failed_attempts.per_second, + burst_count=self.hs.config.rc_login_failed_attempts.burst_count, + update=False, + ) + + self._account_ratelimiter.ratelimit( + user_id.lower(), time_now_s=self._clock.time(), + rate_hz=self.hs.config.rc_login_account.per_second, + burst_count=self.hs.config.rc_login_account.burst_count, + update=True, + ) + @attr.s class MacaroonGenerator(object): diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 75fe50c42c..97d3f31d98 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -164,6 +164,7 @@ class DeactivateAccountHandler(BaseHandler): room_id, "leave", ratelimit=False, + require_consent=False, ) except Exception: logger.exception( diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index c708c35d4d..b398848079 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -37,13 +37,185 @@ from ._base import BaseHandler logger = logging.getLogger(__name__) -class DeviceHandler(BaseHandler): +class DeviceWorkerHandler(BaseHandler): def __init__(self, hs): - super(DeviceHandler, self).__init__(hs) + super(DeviceWorkerHandler, self).__init__(hs) self.hs = hs self.state = hs.get_state_handler() self._auth_handler = hs.get_auth_handler() + + @defer.inlineCallbacks + def get_devices_by_user(self, user_id): + """ + Retrieve the given user's devices + + Args: + user_id (str): + Returns: + defer.Deferred: list[dict[str, X]]: info on each device + """ + + device_map = yield self.store.get_devices_by_user(user_id) + + ips = yield self.store.get_last_client_ip_by_device( + user_id, device_id=None + ) + + devices = list(device_map.values()) + for device in devices: + _update_device_from_client_ips(device, ips) + + defer.returnValue(devices) + + @defer.inlineCallbacks + def get_device(self, user_id, device_id): + """ Retrieve the given device + + Args: + user_id (str): + device_id (str): + + Returns: + defer.Deferred: dict[str, X]: info on the device + Raises: + errors.NotFoundError: if the device was not found + """ + try: + device = yield self.store.get_device(user_id, device_id) + except errors.StoreError: + raise errors.NotFoundError + ips = yield self.store.get_last_client_ip_by_device( + user_id, device_id, + ) + _update_device_from_client_ips(device, ips) + defer.returnValue(device) + + @measure_func("device.get_user_ids_changed") + @defer.inlineCallbacks + def get_user_ids_changed(self, user_id, from_token): + """Get list of users that have had the devices updated, or have newly + joined a room, that `user_id` may be interested in. + + Args: + user_id (str) + from_token (StreamToken) + """ + now_room_key = yield self.store.get_room_events_max_id() + + room_ids = yield self.store.get_rooms_for_user(user_id) + + # First we check if any devices have changed + changed = yield self.store.get_user_whose_devices_changed( + from_token.device_list_key + ) + + # Then work out if any users have since joined + rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key) + + member_events = yield self.store.get_membership_changes_for_user( + user_id, from_token.room_key, now_room_key, + ) + rooms_changed.update(event.room_id for event in member_events) + + stream_ordering = RoomStreamToken.parse_stream_token( + from_token.room_key + ).stream + + possibly_changed = set(changed) + possibly_left = set() + for room_id in rooms_changed: + current_state_ids = yield self.store.get_current_state_ids(room_id) + + # The user may have left the room + # TODO: Check if they actually did or if we were just invited. + if room_id not in room_ids: + for key, event_id in iteritems(current_state_ids): + etype, state_key = key + if etype != EventTypes.Member: + continue + possibly_left.add(state_key) + continue + + # Fetch the current state at the time. + try: + event_ids = yield self.store.get_forward_extremeties_for_room( + room_id, stream_ordering=stream_ordering + ) + except errors.StoreError: + # we have purged the stream_ordering index since the stream + # ordering: treat it the same as a new room + event_ids = [] + + # special-case for an empty prev state: include all members + # in the changed list + if not event_ids: + for key, event_id in iteritems(current_state_ids): + etype, state_key = key + if etype != EventTypes.Member: + continue + possibly_changed.add(state_key) + continue + + current_member_id = current_state_ids.get((EventTypes.Member, user_id)) + if not current_member_id: + continue + + # mapping from event_id -> state_dict + prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) + + # Check if we've joined the room? If so we just blindly add all the users to + # the "possibly changed" users. + for state_dict in itervalues(prev_state_ids): + member_event = state_dict.get((EventTypes.Member, user_id), None) + if not member_event or member_event != current_member_id: + for key, event_id in iteritems(current_state_ids): + etype, state_key = key + if etype != EventTypes.Member: + continue + possibly_changed.add(state_key) + break + + # If there has been any change in membership, include them in the + # possibly changed list. We'll check if they are joined below, + # and we're not toooo worried about spuriously adding users. + for key, event_id in iteritems(current_state_ids): + etype, state_key = key + if etype != EventTypes.Member: + continue + + # check if this member has changed since any of the extremities + # at the stream_ordering, and add them to the list if so. + for state_dict in itervalues(prev_state_ids): + prev_event_id = state_dict.get(key, None) + if not prev_event_id or prev_event_id != event_id: + if state_key != user_id: + possibly_changed.add(state_key) + break + + if possibly_changed or possibly_left: + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) + + # Take the intersection of the users whose devices may have changed + # and those that actually still share a room with the user + possibly_joined = possibly_changed & users_who_share_room + possibly_left = (possibly_changed | possibly_left) - users_who_share_room + else: + possibly_joined = [] + possibly_left = [] + + defer.returnValue({ + "changed": list(possibly_joined), + "left": list(possibly_left), + }) + + +class DeviceHandler(DeviceWorkerHandler): + def __init__(self, hs): + super(DeviceHandler, self).__init__(hs) + self.federation_sender = hs.get_federation_sender() self._edu_updater = DeviceListEduUpdater(hs, self) @@ -104,52 +276,6 @@ class DeviceHandler(BaseHandler): raise errors.StoreError(500, "Couldn't generate a device ID.") @defer.inlineCallbacks - def get_devices_by_user(self, user_id): - """ - Retrieve the given user's devices - - Args: - user_id (str): - Returns: - defer.Deferred: list[dict[str, X]]: info on each device - """ - - device_map = yield self.store.get_devices_by_user(user_id) - - ips = yield self.store.get_last_client_ip_by_device( - user_id, device_id=None - ) - - devices = list(device_map.values()) - for device in devices: - _update_device_from_client_ips(device, ips) - - defer.returnValue(devices) - - @defer.inlineCallbacks - def get_device(self, user_id, device_id): - """ Retrieve the given device - - Args: - user_id (str): - device_id (str): - - Returns: - defer.Deferred: dict[str, X]: info on the device - Raises: - errors.NotFoundError: if the device was not found - """ - try: - device = yield self.store.get_device(user_id, device_id) - except errors.StoreError: - raise errors.NotFoundError - ips = yield self.store.get_last_client_ip_by_device( - user_id, device_id, - ) - _update_device_from_client_ips(device, ips) - defer.returnValue(device) - - @defer.inlineCallbacks def delete_device(self, user_id, device_id): """ Delete the given device @@ -276,6 +402,12 @@ class DeviceHandler(BaseHandler): user_id, device_ids, list(hosts) ) + for device_id in device_ids: + logger.debug( + "Notifying about update %r/%r, ID: %r", user_id, device_id, + position, + ) + room_ids = yield self.store.get_rooms_for_user(user_id) yield self.notifier.on_new_event( @@ -283,130 +415,10 @@ class DeviceHandler(BaseHandler): ) if hosts: - logger.info("Sending device list update notif to: %r", hosts) + logger.info("Sending device list update notif for %r to: %r", user_id, hosts) for host in hosts: self.federation_sender.send_device_messages(host) - @measure_func("device.get_user_ids_changed") - @defer.inlineCallbacks - def get_user_ids_changed(self, user_id, from_token): - """Get list of users that have had the devices updated, or have newly - joined a room, that `user_id` may be interested in. - - Args: - user_id (str) - from_token (StreamToken) - """ - now_token = yield self.hs.get_event_sources().get_current_token() - - room_ids = yield self.store.get_rooms_for_user(user_id) - - # First we check if any devices have changed - changed = yield self.store.get_user_whose_devices_changed( - from_token.device_list_key - ) - - # Then work out if any users have since joined - rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key) - - member_events = yield self.store.get_membership_changes_for_user( - user_id, from_token.room_key, now_token.room_key - ) - rooms_changed.update(event.room_id for event in member_events) - - stream_ordering = RoomStreamToken.parse_stream_token( - from_token.room_key - ).stream - - possibly_changed = set(changed) - possibly_left = set() - for room_id in rooms_changed: - current_state_ids = yield self.store.get_current_state_ids(room_id) - - # The user may have left the room - # TODO: Check if they actually did or if we were just invited. - if room_id not in room_ids: - for key, event_id in iteritems(current_state_ids): - etype, state_key = key - if etype != EventTypes.Member: - continue - possibly_left.add(state_key) - continue - - # Fetch the current state at the time. - try: - event_ids = yield self.store.get_forward_extremeties_for_room( - room_id, stream_ordering=stream_ordering - ) - except errors.StoreError: - # we have purged the stream_ordering index since the stream - # ordering: treat it the same as a new room - event_ids = [] - - # special-case for an empty prev state: include all members - # in the changed list - if not event_ids: - for key, event_id in iteritems(current_state_ids): - etype, state_key = key - if etype != EventTypes.Member: - continue - possibly_changed.add(state_key) - continue - - current_member_id = current_state_ids.get((EventTypes.Member, user_id)) - if not current_member_id: - continue - - # mapping from event_id -> state_dict - prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) - - # Check if we've joined the room? If so we just blindly add all the users to - # the "possibly changed" users. - for state_dict in itervalues(prev_state_ids): - member_event = state_dict.get((EventTypes.Member, user_id), None) - if not member_event or member_event != current_member_id: - for key, event_id in iteritems(current_state_ids): - etype, state_key = key - if etype != EventTypes.Member: - continue - possibly_changed.add(state_key) - break - - # If there has been any change in membership, include them in the - # possibly changed list. We'll check if they are joined below, - # and we're not toooo worried about spuriously adding users. - for key, event_id in iteritems(current_state_ids): - etype, state_key = key - if etype != EventTypes.Member: - continue - - # check if this member has changed since any of the extremities - # at the stream_ordering, and add them to the list if so. - for state_dict in itervalues(prev_state_ids): - prev_event_id = state_dict.get(key, None) - if not prev_event_id or prev_event_id != event_id: - if state_key != user_id: - possibly_changed.add(state_key) - break - - if possibly_changed or possibly_left: - users_who_share_room = yield self.store.get_users_who_share_room_with_user( - user_id - ) - - # Take the intersection of the users whose devices may have changed - # and those that actually still share a room with the user - possibly_joined = possibly_changed & users_who_share_room - possibly_left = (possibly_changed | possibly_left) - users_who_share_room - else: - possibly_joined = [] - possibly_left = [] - - defer.returnValue({ - "changed": list(possibly_joined), - "left": list(possibly_left), - }) - @defer.inlineCallbacks def on_federation_query_user_devices(self, user_id): stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) @@ -473,15 +485,26 @@ class DeviceListEduUpdater(object): if get_domain_from_id(user_id) != origin: # TODO: Raise? - logger.warning("Got device list update edu for %r from %r", user_id, origin) + logger.warning( + "Got device list update edu for %r/%r from %r", + user_id, device_id, origin, + ) return room_ids = yield self.store.get_rooms_for_user(user_id) if not room_ids: # We don't share any rooms with this user. Ignore update, as we # probably won't get any further updates. + logger.warning( + "Got device list update edu for %r/%r, but don't share a room", + user_id, device_id, + ) return + logger.debug( + "Received device list update for %r/%r", user_id, device_id, + ) + self._pending_updates.setdefault(user_id, []).append( (device_id, stream_id, prev_ids, edu_content) ) @@ -499,10 +522,18 @@ class DeviceListEduUpdater(object): # This can happen since we batch updates return + for device_id, stream_id, prev_ids, content in pending_updates: + logger.debug( + "Handling update %r/%r, ID: %r, prev: %r ", + user_id, device_id, stream_id, prev_ids, + ) + # Given a list of updates we check if we need to resync. This # happens if we've missed updates. resync = yield self._need_to_do_resync(user_id, pending_updates) + logger.debug("Need to re-sync devices for %r? %r", user_id, resync) + if resync: # Fetch all devices for the user. origin = get_domain_from_id(user_id) @@ -555,11 +586,21 @@ class DeviceListEduUpdater(object): ) devices = [] + for device in devices: + logger.debug( + "Handling resync update %r/%r, ID: %r", + user_id, device["device_id"], stream_id, + ) + yield self.store.update_remote_device_list_cache( user_id, devices, stream_id, ) device_ids = [device["device_id"] for device in devices] yield self.device_handler.notify_device_update(user_id, device_ids) + + # We clobber the seen updates since we've re-synced from a given + # point. + self._seen_updates[user_id] = set([stream_id]) else: # Simply update the single device, since we know that is the only # change (because of the single prev_id matching the current cache) @@ -572,9 +613,9 @@ class DeviceListEduUpdater(object): user_id, [device_id for device_id, _, _, _ in pending_updates] ) - self._seen_updates.setdefault(user_id, set()).update( - stream_id for _, stream_id, _, _ in pending_updates - ) + self._seen_updates.setdefault(user_id, set()).update( + stream_id for _, stream_id, _, _ in pending_updates + ) @defer.inlineCallbacks def _need_to_do_resync(self, user_id, updates): @@ -587,6 +628,11 @@ class DeviceListEduUpdater(object): user_id ) + logger.debug( + "Current extremity for %r: %r", + user_id, extremity, + ) + stream_id_in_updates = set() # stream_ids in updates list for _, stream_id, prev_ids, _ in updates: if not prev_ids: diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 8b113307d2..fe128d9c88 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -44,6 +44,7 @@ class DirectoryHandler(BaseHandler): self.appservice_handler = hs.get_application_service_handler() self.event_creation_handler = hs.get_event_creation_handler() self.config = hs.config + self.enable_room_list_search = hs.config.enable_room_list_search self.federation = hs.get_federation_client() hs.get_federation_registry().register_query_handler( @@ -411,6 +412,13 @@ class DirectoryHandler(BaseHandler): if visibility not in ["public", "private"]: raise SynapseError(400, "Invalid visibility setting") + if visibility == "public" and not self.enable_room_list_search: + # The room list has been disabled. + raise AuthError( + 403, + "This user is not permitted to publish rooms to the room list" + ) + room = yield self.store.get_room(room_id) if room is None: raise SynapseError(400, "Unknown room") diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index f772e62c28..d883e98381 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -19,7 +19,7 @@ import random from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import AuthError +from synapse.api.errors import AuthError, SynapseError from synapse.events import EventBase from synapse.events.utils import serialize_event from synapse.types import UserID @@ -61,6 +61,11 @@ class EventStreamHandler(BaseHandler): If `only_keys` is not None, events from keys will be sent down. """ + if room_id: + blocked = yield self.store.is_room_blocked(room_id) + if blocked: + raise SynapseError(403, "This room has been blocked on this server") + # send any outstanding server notices to the user. yield self._server_notices_sender.on_user_syncing(auth_user_id) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f80486102a..9eaf2d3e18 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -45,6 +45,7 @@ from synapse.api.errors import ( SynapseError, ) from synapse.crypto.event_signing import compute_event_signature +from synapse.event_auth import auth_types_for_event from synapse.events.validator import EventValidator from synapse.replication.http.federation import ( ReplicationCleanRoomRestServlet, @@ -858,6 +859,52 @@ class FederationHandler(BaseHandler): logger.debug("Not backfilling as no extremeties found.") return + # We only want to paginate if we can actually see the events we'll get, + # as otherwise we'll just spend a lot of resources to get redacted + # events. + # + # We do this by filtering all the backwards extremities and seeing if + # any remain. Given we don't have the extremity events themselves, we + # need to actually check the events that reference them. + # + # *Note*: the spec wants us to keep backfilling until we reach the start + # of the room in case we are allowed to see some of the history. However + # in practice that causes more issues than its worth, as a) its + # relatively rare for there to be any visible history and b) even when + # there is its often sufficiently long ago that clients would stop + # attempting to paginate before backfill reached the visible history. + # + # TODO: If we do do a backfill then we should filter the backwards + # extremities to only include those that point to visible portions of + # history. + # + # TODO: Correctly handle the case where we are allowed to see the + # forward event but not the backward extremity, e.g. in the case of + # initial join of the server where we are allowed to see the join + # event but not anything before it. This would require looking at the + # state *before* the event, ignoring the special casing certain event + # types have. + + forward_events = yield self.store.get_successor_events( + list(extremities), + ) + + extremities_events = yield self.store.get_events( + forward_events, + check_redacted=False, + get_prev_content=False, + ) + + # We set `check_history_visibility_only` as we might otherwise get false + # positives from users having been erased. + filtered_extremities = yield filter_events_for_server( + self.store, self.server_name, list(extremities_events.values()), + redact=False, check_history_visibility_only=True, + ) + + if not filtered_extremities: + defer.returnValue(False) + # Check if we reached a point where we should start backfilling. sorted_extremeties_tuple = sorted( extremities.items(), @@ -1582,6 +1629,7 @@ class FederationHandler(BaseHandler): origin, event, state=state, auth_events=auth_events, + backfilled=backfilled, ) # reraise does not allow inlineCallbacks to preserve the stacktrace, so we @@ -1626,6 +1674,7 @@ class FederationHandler(BaseHandler): event, state=ev_info.get("state"), auth_events=ev_info.get("auth_events"), + backfilled=backfilled, ) defer.returnValue(res) @@ -1748,7 +1797,7 @@ class FederationHandler(BaseHandler): ) @defer.inlineCallbacks - def _prep_event(self, origin, event, state=None, auth_events=None): + def _prep_event(self, origin, event, state, auth_events, backfilled): """ Args: @@ -1756,6 +1805,7 @@ class FederationHandler(BaseHandler): event: state: auth_events: + backfilled (bool) Returns: Deferred, which resolves to synapse.events.snapshot.EventContext @@ -1797,12 +1847,100 @@ class FederationHandler(BaseHandler): context.rejected = RejectedReason.AUTH_ERROR + if not context.rejected: + yield self._check_for_soft_fail(event, state, backfilled) + if event.type == EventTypes.GuestAccess and not context.rejected: yield self.maybe_kick_guest_users(event) defer.returnValue(context) @defer.inlineCallbacks + def _check_for_soft_fail(self, event, state, backfilled): + """Checks if we should soft fail the event, if so marks the event as + such. + + Args: + event (FrozenEvent) + state (dict|None): The state at the event if we don't have all the + event's prev events + backfilled (bool): Whether the event is from backfill + + Returns: + Deferred + """ + # For new (non-backfilled and non-outlier) events we check if the event + # passes auth based on the current state. If it doesn't then we + # "soft-fail" the event. + do_soft_fail_check = not backfilled and not event.internal_metadata.is_outlier() + if do_soft_fail_check: + extrem_ids = yield self.store.get_latest_event_ids_in_room( + event.room_id, + ) + + extrem_ids = set(extrem_ids) + prev_event_ids = set(event.prev_event_ids()) + + if extrem_ids == prev_event_ids: + # If they're the same then the current state is the same as the + # state at the event, so no point rechecking auth for soft fail. + do_soft_fail_check = False + + if do_soft_fail_check: + room_version = yield self.store.get_room_version(event.room_id) + + # Calculate the "current state". + if state is not None: + # If we're explicitly given the state then we won't have all the + # prev events, and so we have a gap in the graph. In this case + # we want to be a little careful as we might have been down for + # a while and have an incorrect view of the current state, + # however we still want to do checks as gaps are easy to + # maliciously manufacture. + # + # So we use a "current state" that is actually a state + # resolution across the current forward extremities and the + # given state at the event. This should correctly handle cases + # like bans, especially with state res v2. + + state_sets = yield self.store.get_state_groups( + event.room_id, extrem_ids, + ) + state_sets = list(state_sets.values()) + state_sets.append(state) + current_state_ids = yield self.state_handler.resolve_events( + room_version, state_sets, event, + ) + current_state_ids = { + k: e.event_id for k, e in iteritems(current_state_ids) + } + else: + current_state_ids = yield self.state_handler.get_current_state_ids( + event.room_id, latest_event_ids=extrem_ids, + ) + + # Now check if event pass auth against said current state + auth_types = auth_types_for_event(event) + current_state_ids = [ + e for k, e in iteritems(current_state_ids) + if k in auth_types + ] + + current_auth_events = yield self.store.get_events(current_state_ids) + current_auth_events = { + (e.type, e.state_key): e for e in current_auth_events.values() + } + + try: + self.auth.check(room_version, event, auth_events=current_auth_events) + except AuthError as e: + logger.warn( + "Failed current state auth resolution for %r because %s", + event, e, + ) + event.internal_metadata.soft_failed = True + + @defer.inlineCallbacks def on_query_auth(self, origin, event_id, room_id, remote_auth_chain, rejects, missing): in_room = yield self.auth.check_host_in_room( diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 563bb3cea3..7dfae78db0 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -18,7 +18,7 @@ import logging from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import AuthError, Codes +from synapse.api.errors import AuthError, Codes, SynapseError from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.handlers.presence import format_user_presence_state @@ -262,6 +262,10 @@ class InitialSyncHandler(BaseHandler): A JSON serialisable dict with the snapshot of the room. """ + blocked = yield self.store.is_room_blocked(room_id) + if blocked: + raise SynapseError(403, "This room has been blocked on this server") + user_id = requester.user.to_string() membership, member_event_id = yield self._check_in_room_or_world_readable( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3981fe69ce..9b41c7b205 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -243,12 +243,19 @@ class EventCreationHandler(object): self.spam_checker = hs.get_spam_checker() - if self.config.block_events_without_consent_error is not None: + self._block_events_without_consent_error = ( + self.config.block_events_without_consent_error + ) + + # we need to construct a ConsentURIBuilder here, as it checks that the necessary + # config options, but *only* if we have a configuration for which we are + # going to need it. + if self._block_events_without_consent_error: self._consent_uri_builder = ConsentURIBuilder(self.config) @defer.inlineCallbacks def create_event(self, requester, event_dict, token_id=None, txn_id=None, - prev_events_and_hashes=None): + prev_events_and_hashes=None, require_consent=True): """ Given a dict from a client, create a new event. @@ -269,6 +276,9 @@ class EventCreationHandler(object): where *hashes* is a map from algorithm to hash. If None, they will be requested from the database. + + require_consent (bool): Whether to check if the requester has + consented to privacy policy. Raises: ResourceLimitError if server is blocked to some resource being exceeded @@ -310,7 +320,7 @@ class EventCreationHandler(object): ) is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester) - if not is_exempt: + if require_consent and not is_exempt: yield self.assert_accepted_privacy_policy(requester) if token_id is not None: @@ -378,7 +388,7 @@ class EventCreationHandler(object): Raises: ConsentNotGivenError: if the user has not given consent yet """ - if self.config.block_events_without_consent_error is None: + if self._block_events_without_consent_error is None: return # exempt AS users from needing consent @@ -405,7 +415,7 @@ class EventCreationHandler(object): consent_uri = self._consent_uri_builder.build_user_consent_uri( requester.user.localpart, ) - msg = self.config.block_events_without_consent_error % { + msg = self._block_events_without_consent_error % { 'consent_uri': consent_uri, } raise ConsentNotGivenError( @@ -436,10 +446,11 @@ class EventCreationHandler(object): if event.is_state(): prev_state = yield self.deduplicate_state_event(event, context) - logger.info( - "Not bothering to persist duplicate state event %s", event.event_id, - ) if prev_state is not None: + logger.info( + "Not bothering to persist state event %s duplicated by %s", + event.event_id, prev_state.event_id, + ) defer.returnValue(prev_state) yield self.handle_new_client_event( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index ba3856674d..37e87fc054 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -816,7 +816,7 @@ class PresenceHandler(object): if self.is_mine(observed_user): yield self.invite_presence(observed_user, observer_user) else: - yield self.federation.send_edu( + yield self.federation.build_and_send_edu( destination=observed_user.domain, edu_type="m.presence_invite", content={ @@ -836,7 +836,7 @@ class PresenceHandler(object): if self.is_mine(observer_user): yield self.accept_presence(observed_user, observer_user) else: - self.federation.send_edu( + self.federation.build_and_send_edu( destination=observer_user.domain, edu_type="m.presence_accept", content={ @@ -848,7 +848,7 @@ class PresenceHandler(object): state_dict = yield self.get_state(observed_user, as_event=False) state_dict = format_user_presence_state(state_dict, self.clock.time_msec()) - self.federation.send_edu( + self.federation.build_and_send_edu( destination=observer_user.domain, edu_type="m.presence", content={ diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 1dfbde84fd..a65c98ff5c 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -147,8 +147,14 @@ class BaseProfileHandler(BaseHandler): @defer.inlineCallbacks def set_displayname(self, target_user, requester, new_displayname, by_admin=False): - """target_user is the user whose displayname is to be changed; - auth_user is the user attempting to make this change.""" + """Set the displayname of a user + + Args: + target_user (UserID): the user whose displayname is to be changed. + requester (Requester): The user attempting to make this change. + new_displayname (str): The displayname to give this user. + by_admin (bool): Whether this change was made by an administrator. + """ if not self.hs.is_mine(target_user): raise SynapseError(400, "User is not hosted on this Home Server") diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 696469732c..274d2946ad 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -16,10 +16,8 @@ import logging from twisted.internet import defer -from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import get_domain_from_id - -from ._base import BaseHandler +from synapse.handlers._base import BaseHandler +from synapse.types import ReadReceipt logger = logging.getLogger(__name__) @@ -39,42 +37,17 @@ class ReceiptsHandler(BaseHandler): self.state = hs.get_state_handler() @defer.inlineCallbacks - def received_client_receipt(self, room_id, receipt_type, user_id, - event_id): - """Called when a client tells us a local user has read up to the given - event_id in the room. - """ - receipt = { - "room_id": room_id, - "receipt_type": receipt_type, - "user_id": user_id, - "event_ids": [event_id], - "data": { - "ts": int(self.clock.time_msec()), - } - } - - is_new = yield self._handle_new_receipts([receipt]) - - if is_new: - # fire off a process in the background to send the receipt to - # remote servers - run_as_background_process( - 'push_receipts_to_remotes', self._push_remotes, receipt - ) - - @defer.inlineCallbacks def _received_remote_receipt(self, origin, content): """Called when we receive an EDU of type m.receipt from a remote HS. """ receipts = [ - { - "room_id": room_id, - "receipt_type": receipt_type, - "user_id": user_id, - "event_ids": user_values["event_ids"], - "data": user_values.get("data", {}), - } + ReadReceipt( + room_id=room_id, + receipt_type=receipt_type, + user_id=user_id, + event_ids=user_values["event_ids"], + data=user_values.get("data", {}), + ) for room_id, room_values in content.items() for receipt_type, users in room_values.items() for user_id, user_values in users.items() @@ -90,14 +63,12 @@ class ReceiptsHandler(BaseHandler): max_batch_id = None for receipt in receipts: - room_id = receipt["room_id"] - receipt_type = receipt["receipt_type"] - user_id = receipt["user_id"] - event_ids = receipt["event_ids"] - data = receipt["data"] - res = yield self.store.insert_receipt( - room_id, receipt_type, user_id, event_ids, data + receipt.room_id, + receipt.receipt_type, + receipt.user_id, + receipt.event_ids, + receipt.data, ) if not res: @@ -115,7 +86,7 @@ class ReceiptsHandler(BaseHandler): # no new receipts defer.returnValue(False) - affected_room_ids = list(set([r["room_id"] for r in receipts])) + affected_room_ids = list(set([r.room_id for r in receipts])) self.notifier.on_new_event( "receipt_key", max_batch_id, rooms=affected_room_ids @@ -128,43 +99,26 @@ class ReceiptsHandler(BaseHandler): defer.returnValue(True) @defer.inlineCallbacks - def _push_remotes(self, receipt): - """Given a receipt, works out which remote servers should be - poked and pokes them. + def received_client_receipt(self, room_id, receipt_type, user_id, + event_id): + """Called when a client tells us a local user has read up to the given + event_id in the room. """ - try: - # TODO: optimise this to move some of the work to the workers. - room_id = receipt["room_id"] - receipt_type = receipt["receipt_type"] - user_id = receipt["user_id"] - event_ids = receipt["event_ids"] - data = receipt["data"] - - users = yield self.state.get_current_user_in_room(room_id) - remotedomains = set(get_domain_from_id(u) for u in users) - remotedomains = remotedomains.copy() - remotedomains.discard(self.server_name) - - logger.debug("Sending receipt to: %r", remotedomains) - - for domain in remotedomains: - self.federation.send_edu( - destination=domain, - edu_type="m.receipt", - content={ - room_id: { - receipt_type: { - user_id: { - "event_ids": event_ids, - "data": data, - } - } - }, - }, - key=(room_id, receipt_type, user_id), - ) - except Exception: - logger.exception("Error pushing receipts to remote servers") + receipt = ReadReceipt( + room_id=room_id, + receipt_type=receipt_type, + user_id=user_id, + event_ids=[event_id], + data={ + "ts": int(self.clock.time_msec()), + }, + ) + + is_new = yield self._handle_new_receipts([receipt]) + if not is_new: + return + + yield self.federation.send_read_receipt(receipt) @defer.inlineCallbacks def get_receipts_for_room(self, room_id, to_key): diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index c0e06929bd..58940e0320 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -23,7 +23,9 @@ from synapse.api.constants import LoginType from synapse.api.errors import ( AuthError, Codes, + ConsentNotGivenError, InvalidCaptchaError, + LimitExceededError, RegistrationError, SynapseError, ) @@ -60,6 +62,7 @@ class RegistrationHandler(BaseHandler): self.user_directory_handler = hs.get_user_directory_handler() self.captcha_client = CaptchaServerHttpClient(hs) self.identity_handler = self.hs.get_handlers().identity_handler + self.ratelimiter = hs.get_registration_ratelimiter() self._next_generated_user_id = None @@ -149,6 +152,7 @@ class RegistrationHandler(BaseHandler): threepid=None, user_type=None, default_display_name=None, + address=None, ): """Registers a new client on the server. @@ -167,6 +171,7 @@ class RegistrationHandler(BaseHandler): api.constants.UserTypes, or None for a normal user. default_display_name (unicode|None): if set, the new user's displayname will be set to this. Defaults to 'localpart'. + address (str|None): the IP address used to perform the registration. Returns: A tuple of (user_id, access_token). Raises: @@ -206,7 +211,7 @@ class RegistrationHandler(BaseHandler): token = None if generate_token: token = self.macaroon_gen.generate_access_token(user_id) - yield self._register_with_store( + yield self.register_with_store( user_id=user_id, token=token, password_hash=password_hash, @@ -215,6 +220,7 @@ class RegistrationHandler(BaseHandler): create_profile_with_displayname=default_display_name, admin=admin, user_type=user_type, + address=address, ) if self.hs.config.user_directory_search_all_users: @@ -238,12 +244,13 @@ class RegistrationHandler(BaseHandler): if default_display_name is None: default_display_name = localpart try: - yield self._register_with_store( + yield self.register_with_store( user_id=user_id, token=token, password_hash=password_hash, make_guest=make_guest, create_profile_with_displayname=default_display_name, + address=address, ) except SynapseError: # if user id is taken, just generate another @@ -305,6 +312,10 @@ class RegistrationHandler(BaseHandler): ) else: yield self._join_user_to_room(fake_requester, r) + except ConsentNotGivenError as e: + # Technically not necessary to pull out this error though + # moving away from bare excepts is a good thing to do. + logger.error("Failed to join new user to %r: %r", r, e) except Exception as e: logger.error("Failed to join new user to %r: %r", r, e) @@ -337,7 +348,7 @@ class RegistrationHandler(BaseHandler): user_id, allowed_appservice=service ) - yield self._register_with_store( + yield self.register_with_store( user_id=user_id, password_hash="", appservice_id=service_id, @@ -513,7 +524,7 @@ class RegistrationHandler(BaseHandler): token = self.macaroon_gen.generate_access_token(user_id) if need_register: - yield self._register_with_store( + yield self.register_with_store( user_id=user_id, token=token, password_hash=password_hash, @@ -590,10 +601,10 @@ class RegistrationHandler(BaseHandler): ratelimit=False, ) - def _register_with_store(self, user_id, token=None, password_hash=None, - was_guest=False, make_guest=False, appservice_id=None, - create_profile_with_displayname=None, admin=False, - user_type=None): + def register_with_store(self, user_id, token=None, password_hash=None, + was_guest=False, make_guest=False, appservice_id=None, + create_profile_with_displayname=None, admin=False, + user_type=None, address=None): """Register user in the datastore. Args: @@ -612,10 +623,26 @@ class RegistrationHandler(BaseHandler): admin (boolean): is an admin user? user_type (str|None): type of user. One of the values from api.constants.UserTypes, or None for a normal user. + address (str|None): the IP address used to perform the registration. Returns: Deferred """ + # Don't rate limit for app services + if appservice_id is None and address is not None: + time_now = self.clock.time() + + allowed, time_allowed = self.ratelimiter.can_do_action( + address, time_now_s=time_now, + rate_hz=self.hs.config.rc_registration.per_second, + burst_count=self.hs.config.rc_registration.burst_count, + ) + + if not allowed: + raise LimitExceededError( + retry_after_ms=int(1000 * (time_allowed - time_now)), + ) + if self.hs.config.worker_app: return self._register_client( user_id=user_id, @@ -627,6 +654,7 @@ class RegistrationHandler(BaseHandler): create_profile_with_displayname=create_profile_with_displayname, admin=admin, user_type=user_type, + address=address, ) else: return self.store.register( @@ -693,9 +721,9 @@ class RegistrationHandler(BaseHandler): access_token (str|None): The access token of the newly logged in device, or None if `inhibit_login` enabled. bind_email (bool): Whether to bind the email with the identity - server + server. bind_msisdn (bool): Whether to bind the msisdn with the identity - server + server. """ if self.hs.config.worker_app: yield self._post_registration_client( @@ -737,7 +765,7 @@ class RegistrationHandler(BaseHandler): """A user consented to the terms on registration Args: - user_id (str): The user ID that consented + user_id (str): The user ID that consented. consent_version (str): version of the policy the user has consented to. """ diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index afa508d729..d6c9d56007 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -44,6 +44,7 @@ EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None) class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) + self.enable_room_list_search = hs.config.enable_room_list_search self.response_cache = ResponseCache(hs, "room_list") self.remote_response_cache = ResponseCache(hs, "remote_room_list", timeout_ms=30 * 1000) @@ -66,10 +67,17 @@ class RoomListHandler(BaseHandler): appservice and network id to use an appservice specific one. Setting to None returns all public rooms across all lists. """ + if not self.enable_room_list_search: + return defer.succeed({ + "chunk": [], + "total_room_count_estimate": 0, + }) + logger.info( "Getting public room list: limit=%r, since=%r, search=%r, network=%r", limit, since_token, bool(search_filter), network_tuple, ) + if search_filter: # We explicitly don't bother caching searches or requests for # appservice specific lists. @@ -441,6 +449,12 @@ class RoomListHandler(BaseHandler): def get_remote_public_room_list(self, server_name, limit=None, since_token=None, search_filter=None, include_all_networks=False, third_party_instance_id=None,): + if not self.enable_room_list_search: + defer.returnValue({ + "chunk": [], + "total_room_count_estimate": 0, + }) + if search_filter: # We currently don't support searching across federation, so we have # to do it manually without pagination diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 190ea2c7b1..71ce5b54e5 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -160,6 +160,7 @@ class RoomMemberHandler(object): txn_id=None, ratelimit=True, content=None, + require_consent=True, ): user_id = target.to_string() @@ -185,6 +186,7 @@ class RoomMemberHandler(object): token_id=requester.access_token_id, txn_id=txn_id, prev_events_and_hashes=prev_events_and_hashes, + require_consent=require_consent, ) # Check if this event matches the previous membership event for the user. @@ -232,6 +234,10 @@ class RoomMemberHandler(object): self.copy_room_tags_and_direct_to_room( predecessor["room_id"], room_id, user_id, ) + # Move over old push rules + self.store.move_push_rules_from_room_to_room_for_user( + predecessor["room_id"], room_id, user_id, + ) elif event.membership == Membership.LEAVE: if prev_member_event_id: prev_member_event = yield self.store.get_event(prev_member_event_id) @@ -301,6 +307,7 @@ class RoomMemberHandler(object): third_party_signed=None, ratelimit=True, content=None, + require_consent=True, ): key = (room_id,) @@ -315,6 +322,7 @@ class RoomMemberHandler(object): third_party_signed=third_party_signed, ratelimit=ratelimit, content=content, + require_consent=require_consent, ) defer.returnValue(result) @@ -331,6 +339,7 @@ class RoomMemberHandler(object): third_party_signed=None, ratelimit=True, content=None, + require_consent=True, ): content_specified = bool(content) if content is None: @@ -512,6 +521,7 @@ class RoomMemberHandler(object): ratelimit=ratelimit, prev_events_and_hashes=prev_events_and_hashes, content=content, + require_consent=require_consent, ) defer.returnValue(res) diff --git a/synapse/handlers/state_deltas.py b/synapse/handlers/state_deltas.py new file mode 100644 index 0000000000..b268bbcb2c --- /dev/null +++ b/synapse/handlers/state_deltas.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +logger = logging.getLogger(__name__) + + +class StateDeltasHandler(object): + + def __init__(self, hs): + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def _get_key_change(self, prev_event_id, event_id, key_name, public_value): + """Given two events check if the `key_name` field in content changed + from not matching `public_value` to doing so. + + For example, check if `history_visibility` (`key_name`) changed from + `shared` to `world_readable` (`public_value`). + + Returns: + None if the field in the events either both match `public_value` + or if neither do, i.e. there has been no change. + True if it didnt match `public_value` but now does + False if it did match `public_value` but now doesn't + """ + prev_event = None + event = None + if prev_event_id: + prev_event = yield self.store.get_event(prev_event_id, allow_none=True) + + if event_id: + event = yield self.store.get_event(event_id, allow_none=True) + + if not event and not prev_event: + logger.debug("Neither event exists: %r %r", prev_event_id, event_id) + defer.returnValue(None) + + prev_value = None + value = None + + if prev_event: + prev_value = prev_event.content.get(key_name) + + if event: + value = event.content.get(key_name) + + logger.debug("prev_value: %r -> value: %r", prev_value, value) + + if value == public_value and prev_value != public_value: + defer.returnValue(True) + elif value != public_value and prev_value == public_value: + defer.returnValue(False) + else: + defer.returnValue(None) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index bd97241ab4..57bb996245 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -39,6 +39,9 @@ from synapse.visibility import filter_events_for_client logger = logging.getLogger(__name__) +# Debug logger for https://github.com/matrix-org/synapse/issues/4422 +issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug") + # Counts the number of times we returned a non-empty sync. `type` is one of # "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is @@ -962,6 +965,15 @@ class SyncHandler(object): yield self._generate_sync_entry_for_groups(sync_result_builder) + # debug for https://github.com/matrix-org/synapse/issues/4422 + for joined_room in sync_result_builder.joined: + room_id = joined_room.room_id + if room_id in newly_joined_rooms: + issue4422_logger.debug( + "Sync result for newly joined room %s: %r", + room_id, joined_room, + ) + defer.returnValue(SyncResult( presence=sync_result_builder.presence, account_data=sync_result_builder.account_data, @@ -1425,6 +1437,17 @@ class SyncHandler(object): old_mem_ev = yield self.store.get_event( old_mem_ev_id, allow_none=True ) + + # debug for #4422 + if has_join: + prev_membership = None + if old_mem_ev: + prev_membership = old_mem_ev.membership + issue4422_logger.debug( + "Previous membership for room %s with join: %s (event %s)", + room_id, prev_membership, old_mem_ev_id, + ) + if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: newly_joined_rooms.append(room_id) @@ -1519,30 +1542,39 @@ class SyncHandler(object): for room_id in sync_result_builder.joined_room_ids: room_entry = room_to_events.get(room_id, None) + newly_joined = room_id in newly_joined_rooms if room_entry: events, start_key = room_entry prev_batch_token = now_token.copy_and_replace("room_key", start_key) - room_entries.append(RoomSyncResultBuilder( + entry = RoomSyncResultBuilder( room_id=room_id, rtype="joined", events=events, - newly_joined=room_id in newly_joined_rooms, + newly_joined=newly_joined, full_state=False, - since_token=None if room_id in newly_joined_rooms else since_token, + since_token=None if newly_joined else since_token, upto_token=prev_batch_token, - )) + ) else: - room_entries.append(RoomSyncResultBuilder( + entry = RoomSyncResultBuilder( room_id=room_id, rtype="joined", events=[], - newly_joined=room_id in newly_joined_rooms, + newly_joined=newly_joined, full_state=False, since_token=since_token, upto_token=since_token, - )) + ) + + if newly_joined: + # debugging for https://github.com/matrix-org/synapse/issues/4422 + issue4422_logger.debug( + "RoomSyncResultBuilder events for newly joined room %s: %r", + room_id, entry.events, + ) + room_entries.append(entry) defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms)) @@ -1663,6 +1695,13 @@ class SyncHandler(object): newly_joined_room=newly_joined, ) + if newly_joined: + # debug for https://github.com/matrix-org/synapse/issues/4422 + issue4422_logger.debug( + "Timeline events after filtering in newly-joined room %s: %r", + room_id, batch, + ) + # When we join the room (or the client requests full_state), we should # send down any existing tags. Usually the user won't have tags in a # newly joined room, unless either a) they've joined before or b) the @@ -1894,15 +1933,34 @@ def _calculate_state( class SyncResultBuilder(object): - "Used to help build up a new SyncResult for a user" + """Used to help build up a new SyncResult for a user + + Attributes: + sync_config (SyncConfig) + full_state (bool) + since_token (StreamToken) + now_token (StreamToken) + joined_room_ids (list[str]) + + # The following mirror the fields in a sync response + presence (list) + account_data (list) + joined (list[JoinedSyncResult]) + invited (list[InvitedSyncResult]) + archived (list[ArchivedSyncResult]) + device (list) + groups (GroupsSyncResult|None) + to_device (list) + """ def __init__(self, sync_config, full_state, since_token, now_token, joined_room_ids): """ Args: - sync_config(SyncConfig) - full_state(bool): The full_state flag as specified by user - since_token(StreamToken): The token supplied by user, or None. - now_token(StreamToken): The token to sync up to. + sync_config (SyncConfig) + full_state (bool): The full_state flag as specified by user + since_token (StreamToken): The token supplied by user, or None. + now_token (StreamToken): The token to sync up to. + joined_room_ids (list[str]): List of rooms the user is joined to """ self.sync_config = sync_config self.full_state = full_state @@ -1930,8 +1988,8 @@ class RoomSyncResultBuilder(object): Args: room_id(str) rtype(str): One of `"joined"` or `"archived"` - events(list): List of events to include in the room, (more events - may be added when generating result). + events(list[FrozenEvent]): List of events to include in the room + (more events may be added when generating result). newly_joined(bool): If the user has newly joined the room full_state(bool): Whether the full state should be sent in result since_token(StreamToken): Earliest point to return events from, or None diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index a61bbf9392..39df960c31 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -231,7 +231,7 @@ class TypingHandler(object): for domain in set(get_domain_from_id(u) for u in users): if domain != self.server_name: logger.debug("sending typing update to %s", domain) - self.federation.send_edu( + self.federation.build_and_send_edu( destination=domain, edu_type="m.typing", content={ diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 283c6c1b81..b689979b4b 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -15,12 +15,13 @@ import logging -from six import iteritems +from six import iteritems, iterkeys from twisted.internet import defer import synapse.metrics from synapse.api.constants import EventTypes, JoinRules, Membership +from synapse.handlers.state_deltas import StateDeltasHandler from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.roommember import ProfileInfo from synapse.types import get_localpart_from_id @@ -29,7 +30,7 @@ from synapse.util.metrics import Measure logger = logging.getLogger(__name__) -class UserDirectoryHandler(object): +class UserDirectoryHandler(StateDeltasHandler): """Handles querying of and keeping updated the user_directory. N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY @@ -38,19 +39,11 @@ class UserDirectoryHandler(object): world_readable or publically joinable room. We keep a database table up to date by streaming changes of the current state and recalculating whether users should be in the directory or not when necessary. - - For each user in the directory we also store a room_id which is public and that the - user is joined to. This allows us to ignore history_visibility and join_rules changes - for that user in all other public rooms, as we know they'll still be in at least - one public room. """ - INITIAL_ROOM_SLEEP_MS = 50 - INITIAL_ROOM_SLEEP_COUNT = 100 - INITIAL_ROOM_BATCH_SIZE = 100 - INITIAL_USER_SLEEP_MS = 10 - def __init__(self, hs): + super(UserDirectoryHandler, self).__init__(hs) + self.store = hs.get_datastore() self.state = hs.get_state_handler() self.server_name = hs.hostname @@ -59,15 +52,6 @@ class UserDirectoryHandler(object): self.is_mine_id = hs.is_mine_id self.update_user_directory = hs.config.update_user_directory self.search_all_users = hs.config.user_directory_search_all_users - - # When start up for the first time we need to populate the user_directory. - # This is a set of user_id's we've inserted already - self.initially_handled_users = set() - self.initially_handled_users_in_public = set() - - self.initially_handled_users_share = set() - self.initially_handled_users_share_private_room = set() - # The current position in the current_state_delta stream self.pos = None @@ -130,7 +114,7 @@ class UserDirectoryHandler(object): # Support users are for diagnostics and should not appear in the user directory. if not is_support: yield self.store.update_profile_in_user_dir( - user_id, profile.display_name, profile.avatar_url, None + user_id, profile.display_name, profile.avatar_url ) @defer.inlineCallbacks @@ -140,7 +124,6 @@ class UserDirectoryHandler(object): # FIXME(#3714): We should probably do this in the same worker as all # the other changes. yield self.store.remove_from_user_dir(user_id) - yield self.store.remove_from_user_in_public_room(user_id) @defer.inlineCallbacks def _unsafe_process(self): @@ -148,10 +131,9 @@ class UserDirectoryHandler(object): if self.pos is None: self.pos = yield self.store.get_user_directory_stream_pos() - # If still None then we need to do the initial fill of directory + # If still None then the initial background update hasn't happened yet if self.pos is None: - yield self._do_initial_spam() - self.pos = yield self.store.get_user_directory_stream_pos() + defer.returnValue(None) # Loop round handling deltas until we're up to date while True: @@ -173,149 +155,6 @@ class UserDirectoryHandler(object): yield self.store.update_user_directory_stream_pos(self.pos) @defer.inlineCallbacks - def _do_initial_spam(self): - """Populates the user_directory from the current state of the DB, used - when synapse first starts with user_directory support - """ - new_pos = yield self.store.get_max_stream_id_in_current_state_deltas() - - # Delete any existing entries just in case there are any - yield self.store.delete_all_from_user_dir() - - # We process by going through each existing room at a time. - room_ids = yield self.store.get_all_rooms() - - logger.info("Doing initial update of user directory. %d rooms", len(room_ids)) - num_processed_rooms = 0 - - for room_id in room_ids: - logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids)) - yield self._handle_initial_room(room_id) - num_processed_rooms += 1 - yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0) - - logger.info("Processed all rooms.") - - if self.search_all_users: - num_processed_users = 0 - user_ids = yield self.store.get_all_local_users() - logger.info( - "Doing initial update of user directory. %d users", len(user_ids) - ) - for user_id in user_ids: - # We add profiles for all users even if they don't match the - # include pattern, just in case we want to change it in future - logger.info( - "Handling user %d/%d", num_processed_users + 1, len(user_ids) - ) - yield self._handle_local_user(user_id) - num_processed_users += 1 - yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.0) - - logger.info("Processed all users") - - self.initially_handled_users = None - self.initially_handled_users_in_public = None - self.initially_handled_users_share = None - self.initially_handled_users_share_private_room = None - - yield self.store.update_user_directory_stream_pos(new_pos) - - @defer.inlineCallbacks - def _handle_initial_room(self, room_id): - """Called when we initially fill out user_directory one room at a time - """ - is_in_room = yield self.store.is_host_joined(room_id, self.server_name) - if not is_in_room: - return - - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - room_id - ) - - users_with_profile = yield self.state.get_current_user_in_room(room_id) - user_ids = set(users_with_profile) - unhandled_users = user_ids - self.initially_handled_users - - yield self.store.add_profiles_to_user_dir( - room_id, - {user_id: users_with_profile[user_id] for user_id in unhandled_users}, - ) - - self.initially_handled_users |= unhandled_users - - if is_public: - yield self.store.add_users_to_public_room( - room_id, user_ids=user_ids - self.initially_handled_users_in_public - ) - self.initially_handled_users_in_public |= user_ids - - # We now go and figure out the new users who share rooms with user entries - # We sleep aggressively here as otherwise it can starve resources. - # We also batch up inserts/updates, but try to avoid too many at once. - to_insert = set() - to_update = set() - count = 0 - for user_id in user_ids: - if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: - yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0) - - if not self.is_mine_id(user_id): - count += 1 - continue - - if self.store.get_if_app_services_interested_in_user(user_id): - count += 1 - continue - - for other_user_id in user_ids: - if user_id == other_user_id: - continue - - if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: - yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0) - count += 1 - - user_set = (user_id, other_user_id) - - if user_set in self.initially_handled_users_share_private_room: - continue - - if user_set in self.initially_handled_users_share: - if is_public: - continue - to_update.add(user_set) - else: - to_insert.add(user_set) - - if is_public: - self.initially_handled_users_share.add(user_set) - else: - self.initially_handled_users_share_private_room.add(user_set) - - if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE: - yield self.store.add_users_who_share_room( - room_id, not is_public, to_insert - ) - to_insert.clear() - - if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE: - yield self.store.update_users_who_share_room( - room_id, not is_public, to_update - ) - to_update.clear() - - if to_insert: - yield self.store.add_users_who_share_room(room_id, not is_public, to_insert) - to_insert.clear() - - if to_update: - yield self.store.update_users_who_share_room( - room_id, not is_public, to_update - ) - to_update.clear() - - @defer.inlineCallbacks def _handle_deltas(self, deltas): """Called with the state deltas to process """ @@ -356,6 +195,7 @@ class UserDirectoryHandler(object): user_ids = yield self.store.get_users_in_dir_due_to_room( room_id ) + for user_id in user_ids: yield self._handle_remove_user(room_id, user_id) return @@ -436,14 +276,20 @@ class UserDirectoryHandler(object): # ignore the change return - if change: - users_with_profile = yield self.state.get_current_user_in_room(room_id) - for user_id, profile in iteritems(users_with_profile): - yield self._handle_new_user(room_id, user_id, profile) - else: - users = yield self.store.get_users_in_public_due_to_room(room_id) - for user_id in users: - yield self._handle_remove_user(room_id, user_id) + users_with_profile = yield self.state.get_current_user_in_room(room_id) + + # Remove every user from the sharing tables for that room. + for user_id in iterkeys(users_with_profile): + yield self.store.remove_user_who_share_room(user_id, room_id) + + # Then, re-add them to the tables. + # NOTE: this is not the most efficient method, as handle_new_user sets + # up local_user -> other_user and other_user_whos_local -> local_user, + # which when ran over an entire room, will result in the same values + # being added multiple times. The batching upserts shouldn't make this + # too bad, though. + for user_id, profile in iteritems(users_with_profile): + yield self._handle_new_user(room_id, user_id, profile) @defer.inlineCallbacks def _handle_local_user(self, user_id): @@ -457,7 +303,9 @@ class UserDirectoryHandler(object): row = yield self.store.get_user_in_directory(user_id) if not row: - yield self.store.add_profiles_to_user_dir(None, {user_id: profile}) + yield self.store.update_profile_in_user_dir( + user_id, profile.display_name, profile.avatar_url + ) @defer.inlineCallbacks def _handle_new_user(self, room_id, user_id, profile): @@ -469,177 +317,68 @@ class UserDirectoryHandler(object): """ logger.debug("Adding new user to dir, %r", user_id) - row = yield self.store.get_user_in_directory(user_id) - if not row: - yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + yield self.store.update_profile_in_user_dir( + user_id, profile.display_name, profile.avatar_url + ) is_public = yield self.store.is_room_world_readable_or_publicly_joinable( room_id ) + # Now we update users who share rooms with users. + users_with_profile = yield self.state.get_current_user_in_room(room_id) if is_public: - row = yield self.store.get_user_in_public_room(user_id) - if not row: - yield self.store.add_users_to_public_room(room_id, [user_id]) + yield self.store.add_users_in_public_rooms(room_id, (user_id,)) else: - logger.debug("Not adding new user to public dir, %r", user_id) + to_insert = set() - # Now we update users who share rooms with users. We do this by getting - # all the current users in the room and seeing which aren't already - # marked in the database as sharing with `user_id` + # First, if they're our user then we need to update for every user + if self.is_mine_id(user_id): - users_with_profile = yield self.state.get_current_user_in_room(room_id) - - to_insert = set() - to_update = set() + is_appservice = self.store.get_if_app_services_interested_in_user( + user_id + ) - is_appservice = self.store.get_if_app_services_interested_in_user(user_id) + # We don't care about appservice users. + if not is_appservice: + for other_user_id in users_with_profile: + if user_id == other_user_id: + continue - # First, if they're our user then we need to update for every user - if self.is_mine_id(user_id) and not is_appservice: - # Returns a map of other_user_id -> shared_private. We only need - # to update mappings if for users that either don't share a room - # already (aren't in the map) or, if the room is private, those that - # only share a public room. - user_ids_shared = yield self.store.get_users_who_share_room_from_dir( - user_id - ) + to_insert.add((user_id, other_user_id)) + # Next we need to update for every local user in the room for other_user_id in users_with_profile: if user_id == other_user_id: continue - shared_is_private = user_ids_shared.get(other_user_id) - if shared_is_private is True: - # We've already marked in the database they share a private room - continue - elif shared_is_private is False: - # They already share a public room, so only update if this is - # a private room - if not is_public: - to_update.add((user_id, other_user_id)) - elif shared_is_private is None: - # This is the first time they both share a room - to_insert.add((user_id, other_user_id)) - - # Next we need to update for every local user in the room - for other_user_id in users_with_profile: - if user_id == other_user_id: - continue - - is_appservice = self.store.get_if_app_services_interested_in_user( - other_user_id - ) - if self.is_mine_id(other_user_id) and not is_appservice: - shared_is_private = yield self.store.get_if_users_share_a_room( - other_user_id, user_id + is_appservice = self.store.get_if_app_services_interested_in_user( + other_user_id ) - if shared_is_private is True: - # We've already marked in the database they share a private room - continue - elif shared_is_private is False: - # They already share a public room, so only update if this is - # a private room - if not is_public: - to_update.add((other_user_id, user_id)) - elif shared_is_private is None: - # This is the first time they both share a room + if self.is_mine_id(other_user_id) and not is_appservice: to_insert.add((other_user_id, user_id)) - if to_insert: - yield self.store.add_users_who_share_room(room_id, not is_public, to_insert) - - if to_update: - yield self.store.update_users_who_share_room( - room_id, not is_public, to_update - ) + if to_insert: + yield self.store.add_users_who_share_private_room(room_id, to_insert) @defer.inlineCallbacks def _handle_remove_user(self, room_id, user_id): - """Called when we might need to remove user to directory + """Called when we might need to remove user from directory Args: room_id (str): room_id that user left or stopped being public that user_id (str) """ - logger.debug("Maybe removing user %r", user_id) - - row = yield self.store.get_user_in_directory(user_id) - update_user_dir = row and row["room_id"] == room_id - - row = yield self.store.get_user_in_public_room(user_id) - update_user_in_public = row and row["room_id"] == room_id - - if update_user_in_public or update_user_dir: - # XXX: Make this faster? - rooms = yield self.store.get_rooms_for_user(user_id) - for j_room_id in rooms: - if not update_user_in_public and not update_user_dir: - break - - is_in_room = yield self.store.is_host_joined( - j_room_id, self.server_name - ) - - if not is_in_room: - continue + logger.debug("Removing user %r", user_id) - if update_user_dir: - update_user_dir = False - yield self.store.update_user_in_user_dir(user_id, j_room_id) + # Remove user from sharing tables + yield self.store.remove_user_who_share_room(user_id, room_id) - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - j_room_id - ) + # Are they still in any rooms? If not, remove them entirely. + rooms_user_is_in = yield self.store.get_user_dir_rooms_user_is_in(user_id) - if update_user_in_public and is_public: - yield self.store.update_user_in_public_user_list(user_id, j_room_id) - update_user_in_public = False - - if update_user_dir: + if len(rooms_user_is_in) == 0: yield self.store.remove_from_user_dir(user_id) - elif update_user_in_public: - yield self.store.remove_from_user_in_public_room(user_id) - - # Now handle users_who_share_rooms. - - # Get a list of user tuples that were in the DB due to this room and - # users (this includes tuples where the other user matches `user_id`) - user_tuples = yield self.store.get_users_in_share_dir_with_room_id( - user_id, room_id - ) - - for user_id, other_user_id in user_tuples: - # For each user tuple get a list of rooms that they still share, - # trying to find a private room, and update the entry in the DB - rooms = yield self.store.get_rooms_in_common_for_users( - user_id, other_user_id - ) - - # If they dont share a room anymore, remove the mapping - if not rooms: - yield self.store.remove_user_who_share_room(user_id, other_user_id) - continue - - found_public_share = None - for j_room_id in rooms: - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - j_room_id - ) - - if is_public: - found_public_share = j_room_id - else: - found_public_share = None - yield self.store.update_users_who_share_room( - room_id, not is_public, [(user_id, other_user_id)] - ) - break - - if found_public_share: - yield self.store.update_users_who_share_room( - room_id, not is_public, [(user_id, other_user_id)] - ) @defer.inlineCallbacks def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id): @@ -665,50 +404,4 @@ class UserDirectoryHandler(object): new_avatar = event.content.get("avatar_url") if prev_name != new_name or prev_avatar != new_avatar: - yield self.store.update_profile_in_user_dir( - user_id, new_name, new_avatar, room_id - ) - - @defer.inlineCallbacks - def _get_key_change(self, prev_event_id, event_id, key_name, public_value): - """Given two events check if the `key_name` field in content changed - from not matching `public_value` to doing so. - - For example, check if `history_visibility` (`key_name`) changed from - `shared` to `world_readable` (`public_value`). - - Returns: - None if the field in the events either both match `public_value` - or if neither do, i.e. there has been no change. - True if it didnt match `public_value` but now does - False if it did match `public_value` but now doesn't - """ - prev_event = None - event = None - if prev_event_id: - prev_event = yield self.store.get_event(prev_event_id, allow_none=True) - - if event_id: - event = yield self.store.get_event(event_id, allow_none=True) - - if not event and not prev_event: - logger.debug("Neither event exists: %r %r", prev_event_id, event_id) - defer.returnValue(None) - - prev_value = None - value = None - - if prev_event: - prev_value = prev_event.content.get(key_name) - - if event: - value = event.content.get(key_name) - - logger.debug("prev_value: %r -> value: %r", prev_value, value) - - if value == public_value and prev_value != public_value: - defer.returnValue(True) - elif value != public_value and prev_value == public_value: - defer.returnValue(False) - else: - defer.returnValue(None) + yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar) |