diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/_base.py | 137 | ||||
-rw-r--r-- | synapse/handlers/auth.py | 93 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 137 | ||||
-rw-r--r-- | synapse/handlers/message.py | 45 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 43 | ||||
-rw-r--r-- | synapse/handlers/receipts.py | 22 | ||||
-rw-r--r-- | synapse/handlers/register.py | 5 | ||||
-rw-r--r-- | synapse/handlers/room.py | 28 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 314 |
9 files changed, 481 insertions, 343 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 5eeb7042c6..13a675b208 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -21,7 +21,7 @@ from synapse.api.constants import Membership, EventTypes from synapse.types import UserID, RoomAlias, Requester from synapse.push.action_generator import ActionGenerator -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import PreserveLoggingContext, preserve_fn import logging @@ -37,6 +37,15 @@ VISIBILITY_PRIORITY = ( ) +MEMBERSHIP_PRIORITY = ( + Membership.JOIN, + Membership.INVITE, + Membership.KNOCK, + Membership.LEAVE, + Membership.BAN, +) + + class BaseHandler(object): """ Common base class for the event handlers. @@ -72,6 +81,7 @@ class BaseHandler(object): * the user is not currently a member of the room, and: * the user has not been a member of the room since the given events + events ([synapse.events.EventBase]): list of events to filter """ forgotten = yield defer.gatherResults([ self.store.who_forgot_in_room( @@ -86,6 +96,12 @@ class BaseHandler(object): ) def allowed(event, user_id, is_peeking): + """ + Args: + event (synapse.events.EventBase): event to check + user_id (str) + is_peeking (bool) + """ state = event_id_to_state[event.event_id] # get the room_visibility at the time of the event. @@ -117,17 +133,30 @@ class BaseHandler(object): if old_priority < new_priority: visibility = prev_visibility - # get the user's membership at the time of the event. (or rather, - # just *after* the event. Which means that people can see their - # own join events, but not (currently) their own leave events.) - membership_event = state.get((EventTypes.Member, user_id), None) - if membership_event: - if membership_event.event_id in event_id_forgotten: - membership = None - else: - membership = membership_event.membership - else: - membership = None + # likewise, if the event is the user's own membership event, use + # the 'most joined' membership + membership = None + if event.type == EventTypes.Member and event.state_key == user_id: + membership = event.content.get("membership", None) + if membership not in MEMBERSHIP_PRIORITY: + membership = "leave" + + prev_content = event.unsigned.get("prev_content", {}) + prev_membership = prev_content.get("membership", None) + if prev_membership not in MEMBERSHIP_PRIORITY: + prev_membership = "leave" + + new_priority = MEMBERSHIP_PRIORITY.index(membership) + old_priority = MEMBERSHIP_PRIORITY.index(prev_membership) + if old_priority < new_priority: + membership = prev_membership + + # otherwise, get the user's membership at the time of the event. + if membership is None: + membership_event = state.get((EventTypes.Member, user_id), None) + if membership_event: + if membership_event.event_id not in event_id_forgotten: + membership = membership_event.membership # if the user was a member of the room at the time of the event, # they can see it. @@ -204,20 +233,25 @@ class BaseHandler(object): ) @defer.inlineCallbacks - def _create_new_client_event(self, builder): - latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room( - builder.room_id, - ) - - if latest_ret: - depth = max([d for _, _, d in latest_ret]) + 1 + def _create_new_client_event(self, builder, prev_event_ids=None): + if prev_event_ids: + prev_events = yield self.store.add_event_hashes(prev_event_ids) + prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) + depth = prev_max_depth + 1 else: - depth = 1 + latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room( + builder.room_id, + ) - prev_events = [ - (event_id, prev_hashes) - for event_id, prev_hashes, _ in latest_ret - ] + if latest_ret: + depth = max([d for _, _, d in latest_ret]) + 1 + else: + depth = 1 + + prev_events = [ + (event_id, prev_hashes) + for event_id, prev_hashes, _ in latest_ret + ] builder.prev_events = prev_events builder.depth = depth @@ -226,49 +260,6 @@ class BaseHandler(object): context = yield state_handler.compute_event_context(builder) - # If we've received an invite over federation, there are no latest - # events in the room, because we don't know enough about the graph - # fragment we received to treat it like a graph, so the above returned - # no relevant events. It may have returned some events (if we have - # joined and left the room), but not useful ones, like the invite. - if ( - not self.is_host_in_room(context.current_state) and - builder.type == EventTypes.Member - ): - prev_member_event = yield self.store.get_room_member( - builder.sender, builder.room_id - ) - - # The prev_member_event may already be in context.current_state, - # despite us not being present in the room; in particular, if - # inviting user, and all other local users, have already left. - # - # In that case, we have all the information we need, and we don't - # want to drop "context" - not least because we may need to handle - # the invite locally, which will require us to have the whole - # context (not just prev_member_event) to auth it. - # - context_event_ids = ( - e.event_id for e in context.current_state.values() - ) - - if ( - prev_member_event and - prev_member_event.event_id not in context_event_ids - ): - # The prev_member_event is missing from context, so it must - # have arrived over federation and is an outlier. We forcibly - # set our context to the invite we received over federation - builder.prev_events = ( - prev_member_event.event_id, - prev_member_event.prev_events - ) - - context = yield state_handler.compute_event_context( - builder, - old_state=(prev_member_event,) - ) - if builder.is_state(): builder.prev_state = yield self.store.add_event_hashes( context.prev_state_events @@ -325,7 +316,11 @@ class BaseHandler(object): if ratelimit: self.ratelimit(requester) - self.auth.check(event, auth_events=context.current_state) + try: + self.auth.check(event, auth_events=context.current_state) + except AuthError as err: + logger.warn("Denying new event %r because %s", event, err) + raise err yield self.maybe_kick_guest_users(event, context.current_state.values()) @@ -415,6 +410,12 @@ class BaseHandler(object): event, context=context ) + # this intentionally does not yield: we don't care about the result + # and don't need to wait for it. + preserve_fn(self.hs.get_pusherpool().on_new_notifications)( + event_stream_id, max_stream_id + ) + destinations = set() for k, s in context.current_state.items(): try: diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index d5d6faa85f..61fe56032a 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -49,6 +49,21 @@ class AuthHandler(BaseHandler): self.sessions = {} self.INVALID_TOKEN_HTTP_STATUS = 401 + self.ldap_enabled = hs.config.ldap_enabled + self.ldap_server = hs.config.ldap_server + self.ldap_port = hs.config.ldap_port + self.ldap_tls = hs.config.ldap_tls + self.ldap_search_base = hs.config.ldap_search_base + self.ldap_search_property = hs.config.ldap_search_property + self.ldap_email_property = hs.config.ldap_email_property + self.ldap_full_name_property = hs.config.ldap_full_name_property + + if self.ldap_enabled is True: + import ldap + logger.info("Import ldap version: %s", ldap.__version__) + + self.hs = hs # FIXME better possibility to access registrationHandler later? + @defer.inlineCallbacks def check_auth(self, flows, clientdict, clientip): """ @@ -215,8 +230,10 @@ class AuthHandler(BaseHandler): if not user_id.startswith('@'): user_id = UserID.create(user_id, self.hs.hostname).to_string() - user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id) - self._check_password(user_id, password, password_hash) + if not (yield self._check_password(user_id, password)): + logger.warn("Failed password login for user %s", user_id) + raise LoginError(403, "", errcode=Codes.FORBIDDEN) + defer.returnValue(user_id) @defer.inlineCallbacks @@ -340,8 +357,10 @@ class AuthHandler(BaseHandler): StoreError if there was a problem storing the token. LoginError if there was an authentication problem. """ - user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id) - self._check_password(user_id, password, password_hash) + + if not (yield self._check_password(user_id, password)): + logger.warn("Failed password login for user %s", user_id) + raise LoginError(403, "", errcode=Codes.FORBIDDEN) logger.info("Logging in user %s", user_id) access_token = yield self.issue_access_token(user_id) @@ -407,11 +426,67 @@ class AuthHandler(BaseHandler): else: defer.returnValue(user_infos.popitem()) - def _check_password(self, user_id, password, stored_hash): - """Checks that user_id has passed password, raises LoginError if not.""" - if not self.validate_hash(password, stored_hash): - logger.warn("Failed password login for user %s", user_id) - raise LoginError(403, "", errcode=Codes.FORBIDDEN) + @defer.inlineCallbacks + def _check_password(self, user_id, password): + """ + Returns: + True if the user_id successfully authenticated + """ + valid_ldap = yield self._check_ldap_password(user_id, password) + if valid_ldap: + defer.returnValue(True) + + valid_local_password = yield self._check_local_password(user_id, password) + if valid_local_password: + defer.returnValue(True) + + defer.returnValue(False) + + @defer.inlineCallbacks + def _check_local_password(self, user_id, password): + try: + user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id) + defer.returnValue(self.validate_hash(password, password_hash)) + except LoginError: + defer.returnValue(False) + + @defer.inlineCallbacks + def _check_ldap_password(self, user_id, password): + if not self.ldap_enabled: + logger.debug("LDAP not configured") + defer.returnValue(False) + + import ldap + + logger.info("Authenticating %s with LDAP" % user_id) + try: + ldap_url = "%s:%s" % (self.ldap_server, self.ldap_port) + logger.debug("Connecting LDAP server at %s" % ldap_url) + l = ldap.initialize(ldap_url) + if self.ldap_tls: + logger.debug("Initiating TLS") + self._connection.start_tls_s() + + local_name = UserID.from_string(user_id).localpart + + dn = "%s=%s, %s" % ( + self.ldap_search_property, + local_name, + self.ldap_search_base) + logger.debug("DN for LDAP authentication: %s" % dn) + + l.simple_bind_s(dn.encode('utf-8'), password.encode('utf-8')) + + if not (yield self.does_user_exist(user_id)): + handler = self.hs.get_handlers().registration_handler + user_id, access_token = ( + yield handler.register(localpart=local_name) + ) + + defer.returnValue(True) + except ldap.LDAPError, e: + logger.warn("LDAP error: %s", e) + defer.returnValue(False) @defer.inlineCallbacks def issue_access_token(self, user_id): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index adafd06b24..d95e0b23b1 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -26,7 +26,7 @@ from synapse.api.errors import ( from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import PreserveLoggingContext, preserve_fn from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.util.frozenutils import unfreeze @@ -40,6 +40,7 @@ from synapse.events.utils import prune_event from synapse.util.retryutils import NotRetryingDestination from synapse.push.action_generator import ActionGenerator +from synapse.util.distributor import user_joined_room from twisted.internet import defer @@ -49,10 +50,6 @@ import logging logger = logging.getLogger(__name__) -def user_joined_room(distributor, user, room_id): - return distributor.fire("user_joined_room", user, room_id) - - class FederationHandler(BaseHandler): """Handles events that originated from federation. Responsible for: @@ -283,7 +280,14 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def backfill(self, dest, room_id, limit, extremities=[]): """ Trigger a backfill request to `dest` for the given `room_id` + + This will attempt to get more events from the remote. This may return + be successfull and still return no events if the other side has no new + events to offer. """ + if dest == self.server_name: + raise SynapseError(400, "Can't backfill from self.") + if not extremities: extremities = yield self.store.get_oldest_events_in_room(room_id) @@ -294,6 +298,16 @@ class FederationHandler(BaseHandler): extremities=extremities, ) + # Don't bother processing events we already have. + seen_events = yield self.store.have_events_in_timeline( + set(e.event_id for e in events) + ) + + events = [e for e in events if e.event_id not in seen_events] + + if not events: + defer.returnValue([]) + event_map = {e.event_id: e for e in events} event_ids = set(e.event_id for e in events) @@ -353,6 +367,7 @@ class FederationHandler(BaseHandler): for a in auth_events.values(): if a.event_id in seen_events: continue + a.internal_metadata.outlier = True ev_infos.append({ "event": a, "auth_events": { @@ -373,20 +388,23 @@ class FederationHandler(BaseHandler): } }) + yield self._handle_new_events( + dest, ev_infos, + backfilled=True, + ) + events.sort(key=lambda e: e.depth) for event in events: if event in events_to_state: continue - ev_infos.append({ - "event": event, - }) - - yield self._handle_new_events( - dest, ev_infos, - backfilled=True, - ) + # We store these one at a time since each event depends on the + # previous to work out the state. + # TODO: We can probably do something more clever here. + yield self._handle_new_event( + dest, event + ) defer.returnValue(events) @@ -450,7 +468,7 @@ class FederationHandler(BaseHandler): likely_domains = [ domain for domain, depth in curr_domains - if domain is not self.server_name + if domain != self.server_name ] @defer.inlineCallbacks @@ -458,11 +476,15 @@ class FederationHandler(BaseHandler): # TODO: Should we try multiple of these at a time? for dom in domains: try: - events = yield self.backfill( + yield self.backfill( dom, room_id, limit=100, extremities=[e for e in extremities.keys()] ) + # If this succeeded then we probably already have the + # appropriate stuff. + # TODO: We can probably do something more intelligent here. + defer.returnValue(True) except SynapseError as e: logger.info( "Failed to backfill from %s because %s", @@ -488,8 +510,6 @@ class FederationHandler(BaseHandler): ) continue - if events: - defer.returnValue(True) defer.returnValue(False) success = yield try_backfill(likely_domains) @@ -661,9 +681,13 @@ class FederationHandler(BaseHandler): "state_key": user_id, }) - event, context = yield self._create_new_client_event( - builder=builder, - ) + try: + event, context = yield self._create_new_client_event( + builder=builder, + ) + except AuthError as e: + logger.warn("Failed to create join %r because %s", event, e) + raise e self.auth.check(event, auth_events=context.current_state) @@ -784,13 +808,19 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def do_remotely_reject_invite(self, target_hosts, room_id, user_id): - origin, event = yield self._make_and_verify_event( - target_hosts, - room_id, - user_id, - "leave" - ) - signed_event = self._sign_event(event) + try: + origin, event = yield self._make_and_verify_event( + target_hosts, + room_id, + user_id, + "leave" + ) + signed_event = self._sign_event(event) + except SynapseError: + raise + except CodeMessageException as e: + logger.warn("Failed to reject invite: %s", e) + raise SynapseError(500, "Failed to reject invite") # Try the host we successfully got a response to /make_join/ # request first. @@ -800,10 +830,16 @@ class FederationHandler(BaseHandler): except ValueError: pass - yield self.replication_layer.send_leave( - target_hosts, - signed_event - ) + try: + yield self.replication_layer.send_leave( + target_hosts, + signed_event + ) + except SynapseError: + raise + except CodeMessageException as e: + logger.warn("Failed to reject invite: %s", e) + raise SynapseError(500, "Failed to reject invite") context = yield self.state_handler.compute_event_context(event) @@ -883,7 +919,11 @@ class FederationHandler(BaseHandler): builder=builder, ) - self.auth.check(event, auth_events=context.current_state) + try: + self.auth.check(event, auth_events=context.current_state) + except AuthError as e: + logger.warn("Failed to create new leave %r because %s", event, e) + raise e defer.returnValue(event) @@ -1064,7 +1104,8 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def _handle_new_event(self, origin, event, state=None, auth_events=None): + def _handle_new_event(self, origin, event, state=None, auth_events=None, + backfilled=False): context = yield self._prep_event( origin, event, state=state, @@ -1080,12 +1121,24 @@ class FederationHandler(BaseHandler): event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, + backfilled=backfilled, + ) + + # this intentionally does not yield: we don't care about the result + # and don't need to wait for it. + preserve_fn(self.hs.get_pusherpool().on_new_notifications)( + event_stream_id, max_stream_id ) defer.returnValue((context, event_stream_id, max_stream_id)) @defer.inlineCallbacks def _handle_new_events(self, origin, event_infos, backfilled=False): + """Creates the appropriate contexts and persists events. The events + should not depend on one another, e.g. this should be used to persist + a bunch of outliers, but not a chunk of individual events that depend + on each other for state calculations. + """ contexts = yield defer.gatherResults( [ self._prep_event( @@ -1467,8 +1520,9 @@ class FederationHandler(BaseHandler): try: self.auth.check(event, auth_events=auth_events) - except AuthError: - raise + except AuthError as e: + logger.warn("Failed auth resolution for %r because %s", event, e) + raise e @defer.inlineCallbacks def construct_auth_difference(self, local_auth, remote_auth): @@ -1644,7 +1698,12 @@ class FederationHandler(BaseHandler): event_dict, event, context ) - self.auth.check(event, context.current_state) + try: + self.auth.check(event, context.current_state) + except AuthError as e: + logger.warn("Denying new third party invite %r because %s", event, e) + raise e + yield self._check_signature(event, auth_events=context.current_state) member_handler = self.hs.get_handlers().room_member_handler yield member_handler.send_membership_event(None, event, context) @@ -1669,7 +1728,11 @@ class FederationHandler(BaseHandler): event_dict, event, context ) - self.auth.check(event, auth_events=context.current_state) + try: + self.auth.check(event, auth_events=context.current_state) + except AuthError as e: + logger.warn("Denying third party invite %r because %s", event, e) + raise e yield self._check_signature(event, auth_events=context.current_state) returned_invite = yield self.send_invite(origin, event) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 0bb111d047..f51feda2f4 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -34,10 +34,6 @@ import logging logger = logging.getLogger(__name__) -def collect_presencelike_data(distributor, user, content): - return distributor.fire("collect_presencelike_data", user, content) - - class MessageHandler(BaseHandler): def __init__(self, hs): @@ -49,35 +45,6 @@ class MessageHandler(BaseHandler): self.snapshot_cache = SnapshotCache() @defer.inlineCallbacks - def get_message(self, msg_id=None, room_id=None, sender_id=None, - user_id=None): - """ Retrieve a message. - - Args: - msg_id (str): The message ID to obtain. - room_id (str): The room where the message resides. - sender_id (str): The user ID of the user who sent the message. - user_id (str): The user ID of the user making this request. - Returns: - The message, or None if no message exists. - Raises: - SynapseError if something went wrong. - """ - yield self.auth.check_joined_room(room_id, user_id) - - # Pull out the message from the db -# msg = yield self.store.get_message( -# room_id=room_id, -# msg_id=msg_id, -# user_id=sender_id -# ) - - # TODO (erikj): Once we work out the correct c-s api we need to think - # on how to do this. - - defer.returnValue(None) - - @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, as_client_event=True): """Get messages in a room. @@ -176,7 +143,7 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) @defer.inlineCallbacks - def create_event(self, event_dict, token_id=None, txn_id=None): + def create_event(self, event_dict, token_id=None, txn_id=None, prev_event_ids=None): """ Given a dict from a client, create a new event. @@ -187,6 +154,9 @@ class MessageHandler(BaseHandler): Args: event_dict (dict): An entire event + token_id (str) + txn_id (str) + prev_event_ids (list): The prev event ids to use when creating the event Returns: Tuple of created event (FrozenEvent), Context @@ -199,12 +169,8 @@ class MessageHandler(BaseHandler): membership = builder.content.get("membership", None) target = UserID.from_string(builder.state_key) - if membership == Membership.JOIN: + if membership in {Membership.JOIN, Membership.INVITE}: # If event doesn't include a display name, add one. - yield collect_presencelike_data( - self.distributor, target, builder.content - ) - elif membership == Membership.INVITE: profile = self.hs.get_handlers().profile_handler content = builder.content @@ -225,6 +191,7 @@ class MessageHandler(BaseHandler): event, context = yield self._create_new_client_event( builder=builder, + prev_event_ids=prev_event_ids, ) defer.returnValue((event, context)) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index b45eafbb49..e37409170d 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -17,7 +17,6 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError, CodeMessageException from synapse.types import UserID, Requester -from synapse.util import unwrapFirstError from ._base import BaseHandler @@ -27,14 +26,6 @@ import logging logger = logging.getLogger(__name__) -def changed_presencelike_data(distributor, user, state): - return distributor.fire("changed_presencelike_data", user, state) - - -def collect_presencelike_data(distributor, user, content): - return distributor.fire("collect_presencelike_data", user, content) - - class ProfileHandler(BaseHandler): def __init__(self, hs): @@ -46,17 +37,9 @@ class ProfileHandler(BaseHandler): ) distributor = hs.get_distributor() - self.distributor = distributor - - distributor.declare("collect_presencelike_data") - distributor.declare("changed_presencelike_data") distributor.observe("registered_user", self.registered_user) - distributor.observe( - "collect_presencelike_data", self.collect_presencelike_data - ) - def registered_user(self, user): return self.store.create_profile(user.localpart) @@ -105,10 +88,6 @@ class ProfileHandler(BaseHandler): target_user.localpart, new_displayname ) - yield changed_presencelike_data(self.distributor, target_user, { - "displayname": new_displayname, - }) - yield self._update_join_states(requester) @defer.inlineCallbacks @@ -152,31 +131,9 @@ class ProfileHandler(BaseHandler): target_user.localpart, new_avatar_url ) - yield changed_presencelike_data(self.distributor, target_user, { - "avatar_url": new_avatar_url, - }) - yield self._update_join_states(requester) @defer.inlineCallbacks - def collect_presencelike_data(self, user, state): - if not self.hs.is_mine(user): - defer.returnValue(None) - - (displayname, avatar_url) = yield defer.gatherResults( - [ - self.store.get_profile_displayname(user.localpart), - self.store.get_profile_avatar_url(user.localpart), - ], - consumeErrors=True - ).addErrback(unwrapFirstError) - - state["displayname"] = displayname - state["avatar_url"] = avatar_url - - defer.returnValue(None) - - @defer.inlineCallbacks def on_profile_query(self, args): user = UserID.from_string(args["user_id"]) if not self.hs.is_mine(user): diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 935c339707..a390a1b8bd 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -80,6 +80,9 @@ class ReceiptsHandler(BaseHandler): def _handle_new_receipts(self, receipts): """Takes a list of receipts, stores them and informs the notifier. """ + min_batch_id = None + max_batch_id = None + for receipt in receipts: room_id = receipt["room_id"] receipt_type = receipt["receipt_type"] @@ -97,10 +100,21 @@ class ReceiptsHandler(BaseHandler): stream_id, max_persisted_id = res - with PreserveLoggingContext(): - self.notifier.on_new_event( - "receipt_key", max_persisted_id, rooms=[room_id] - ) + if min_batch_id is None or stream_id < min_batch_id: + min_batch_id = stream_id + if max_batch_id is None or max_persisted_id > max_batch_id: + max_batch_id = max_persisted_id + + affected_room_ids = list(set([r["room_id"] for r in receipts])) + + with PreserveLoggingContext(): + self.notifier.on_new_event( + "receipt_key", max_batch_id, rooms=affected_room_ids + ) + # Note that the min here shouldn't be relied upon to be accurate. + self.hs.get_pusherpool().on_new_receipts( + min_batch_id, max_batch_id, affected_room_ids + ) defer.returnValue(True) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index f287ee247b..b0862067e1 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -23,6 +23,7 @@ from synapse.api.errors import ( from ._base import BaseHandler from synapse.util.async import run_on_reactor from synapse.http.client import CaptchaServerHttpClient +from synapse.util.distributor import registered_user import logging import urllib @@ -30,10 +31,6 @@ import urllib logger = logging.getLogger(__name__) -def registered_user(distributor, user): - return distributor.fire("registered_user", user) - - class RegistrationHandler(BaseHandler): def __init__(self, hs): diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3e1d9282d7..dd9c18df84 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -25,7 +25,6 @@ from synapse.api.constants import ( from synapse.api.errors import AuthError, StoreError, SynapseError from synapse.util import stringutils from synapse.util.async import concurrently_execute -from synapse.util.logcontext import preserve_context_over_fn from synapse.util.caches.response_cache import ResponseCache from collections import OrderedDict @@ -39,20 +38,6 @@ logger = logging.getLogger(__name__) id_server_scheme = "https://" -def user_left_room(distributor, user, room_id): - return preserve_context_over_fn( - distributor.fire, - "user_left_room", user=user, room_id=room_id - ) - - -def user_joined_room(distributor, user, room_id): - return preserve_context_over_fn( - distributor.fire, - "user_joined_room", user=user, room_id=room_id - ) - - class RoomCreationHandler(BaseHandler): PRESETS_DICT = { @@ -373,8 +358,6 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def handle_room(room_id): - aliases = yield self.store.get_aliases_for_room(room_id) - # We pull each bit of state out indvidually to avoid pulling the # full state into memory. Due to how the caching works this should # be fairly quick, even if not originally in the cache. @@ -389,6 +372,14 @@ class RoomListHandler(BaseHandler): defer.returnValue(None) result = {"room_id": room_id} + + joined_users = yield self.store.get_users_in_room(room_id) + if len(joined_users) == 0: + return + + result["num_joined_members"] = len(joined_users) + + aliases = yield self.store.get_aliases_for_room(room_id) if aliases: result["aliases"] = aliases @@ -428,9 +419,6 @@ class RoomListHandler(BaseHandler): if avatar_url: result["avatar_url"] = avatar_url - joined_users = yield self.store.get_users_in_room(room_id) - result["num_joined_members"] = len(joined_users) - results.append(result) yield concurrently_execute(handle_room, room_ids, 10) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 01f833c371..b69f36aefe 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -23,7 +23,8 @@ from synapse.api.constants import ( EventTypes, Membership, ) from synapse.api.errors import AuthError, SynapseError, Codes -from synapse.util.logcontext import preserve_context_over_fn +from synapse.util.async import Linearizer +from synapse.util.distributor import user_left_room, user_joined_room from signedjson.sign import verify_signed_json from signedjson.key import decode_verify_key_bytes @@ -37,20 +38,6 @@ logger = logging.getLogger(__name__) id_server_scheme = "https://" -def user_left_room(distributor, user, room_id): - return preserve_context_over_fn( - distributor.fire, - "user_left_room", user=user, room_id=room_id - ) - - -def user_joined_room(distributor, user, room_id): - return preserve_context_over_fn( - distributor.fire, - "user_joined_room", user=user, room_id=room_id - ) - - class RoomMemberHandler(BaseHandler): # TODO(paul): This handler currently contains a messy conflation of # low-level API that works on UserID objects and so on, and REST-level @@ -60,6 +47,8 @@ class RoomMemberHandler(BaseHandler): def __init__(self, hs): super(RoomMemberHandler, self).__init__(hs) + self.member_linearizer = Linearizer() + self.clock = hs.get_clock() self.distributor = hs.get_distributor() @@ -96,6 +85,82 @@ class RoomMemberHandler(BaseHandler): remotedomains.add(member.domain) @defer.inlineCallbacks + def _local_membership_update( + self, requester, target, room_id, membership, + prev_event_ids, + txn_id=None, + ratelimit=True, + ): + msg_handler = self.hs.get_handlers().message_handler + + content = {"membership": membership} + if requester.is_guest: + content["kind"] = "guest" + + event, context = yield msg_handler.create_event( + { + "type": EventTypes.Member, + "content": content, + "room_id": room_id, + "sender": requester.user.to_string(), + "state_key": target.to_string(), + + # For backwards compatibility: + "membership": membership, + }, + token_id=requester.access_token_id, + txn_id=txn_id, + prev_event_ids=prev_event_ids, + ) + + yield self.handle_new_client_event( + requester, + event, + context, + extra_users=[target], + ratelimit=ratelimit, + ) + + prev_member_event = context.current_state.get( + (EventTypes.Member, target.to_string()), + None + ) + + if event.membership == Membership.JOIN: + if not prev_member_event or prev_member_event.membership != Membership.JOIN: + # Only fire user_joined_room if the user has acutally joined the + # room. Don't bother if the user is just changing their profile + # info. + yield user_joined_room(self.distributor, target, room_id) + elif event.membership == Membership.LEAVE: + if prev_member_event and prev_member_event.membership == Membership.JOIN: + user_left_room(self.distributor, target, room_id) + + @defer.inlineCallbacks + def remote_join(self, remote_room_hosts, room_id, user, content): + if len(remote_room_hosts) == 0: + raise SynapseError(404, "No known servers") + + # We don't do an auth check if we are doing an invite + # join dance for now, since we're kinda implicitly checking + # that we are allowed to join when we decide whether or not we + # need to do the invite/join dance. + yield self.hs.get_handlers().federation_handler.do_invite_join( + remote_room_hosts, + room_id, + user.to_string(), + content, + ) + yield user_joined_room(self.distributor, user, room_id) + + def reject_remote_invite(self, user_id, room_id, remote_room_hosts): + return self.hs.get_handlers().federation_handler.do_remotely_reject_invite( + remote_room_hosts, + room_id, + user_id + ) + + @defer.inlineCallbacks def update_membership( self, requester, @@ -107,6 +172,34 @@ class RoomMemberHandler(BaseHandler): third_party_signed=None, ratelimit=True, ): + key = (target, room_id,) + + with (yield self.member_linearizer.queue(key)): + result = yield self._update_membership( + requester, + target, + room_id, + action, + txn_id=txn_id, + remote_room_hosts=remote_room_hosts, + third_party_signed=third_party_signed, + ratelimit=ratelimit, + ) + + defer.returnValue(result) + + @defer.inlineCallbacks + def _update_membership( + self, + requester, + target, + room_id, + action, + txn_id=None, + remote_room_hosts=None, + third_party_signed=None, + ratelimit=True, + ): effective_membership_state = action if action in ["kick", "unban"]: effective_membership_state = "leave" @@ -120,28 +213,15 @@ class RoomMemberHandler(BaseHandler): third_party_signed, ) - msg_handler = self.hs.get_handlers().message_handler + if not remote_room_hosts: + remote_room_hosts = [] - content = {"membership": effective_membership_state} - if requester.is_guest: - content["kind"] = "guest" - - event, context = yield msg_handler.create_event( - { - "type": EventTypes.Member, - "content": content, - "room_id": room_id, - "sender": requester.user.to_string(), - "state_key": target.to_string(), - - # For backwards compatibility: - "membership": effective_membership_state, - }, - token_id=requester.access_token_id, - txn_id=txn_id, + latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + current_state = yield self.state_handler.get_current_state( + room_id, latest_event_ids=latest_event_ids, ) - old_state = context.current_state.get((EventTypes.Member, event.state_key)) + old_state = current_state.get((EventTypes.Member, target.to_string())) old_membership = old_state.content.get("membership") if old_state else None if action == "unban" and old_membership != "ban": raise SynapseError( @@ -156,13 +236,73 @@ class RoomMemberHandler(BaseHandler): errcode=Codes.BAD_STATE ) - member_handler = self.hs.get_handlers().room_member_handler - yield member_handler.send_membership_event( - requester, - event, - context, + is_host_in_room = self.is_host_in_room(current_state) + + if effective_membership_state == Membership.JOIN: + if requester.is_guest and not self._can_guest_join(current_state): + # This should be an auth check, but guests are a local concept, + # so don't really fit into the general auth process. + raise AuthError(403, "Guest access not allowed") + + if not is_host_in_room: + inviter = yield self.get_inviter(target.to_string(), room_id) + if inviter and not self.hs.is_mine(inviter): + remote_room_hosts.append(inviter.domain) + + content = {"membership": Membership.JOIN} + + profile = self.hs.get_handlers().profile_handler + content["displayname"] = yield profile.get_displayname(target) + content["avatar_url"] = yield profile.get_avatar_url(target) + + if requester.is_guest: + content["kind"] = "guest" + + ret = yield self.remote_join( + remote_room_hosts, room_id, target, content + ) + defer.returnValue(ret) + + elif effective_membership_state == Membership.LEAVE: + if not is_host_in_room: + # perhaps we've been invited + inviter = yield self.get_inviter(target.to_string(), room_id) + if not inviter: + raise SynapseError(404, "Not a known room") + + if self.hs.is_mine(inviter): + # the inviter was on our server, but has now left. Carry on + # with the normal rejection codepath. + # + # This is a bit of a hack, because the room might still be + # active on other servers. + pass + else: + # send the rejection to the inviter's HS. + remote_room_hosts = remote_room_hosts + [inviter.domain] + + try: + ret = yield self.reject_remote_invite( + target.to_string(), room_id, remote_room_hosts + ) + defer.returnValue(ret) + except SynapseError as e: + logger.warn("Failed to reject invite: %s", e) + + yield self.store.locally_reject_invite( + target.to_string(), room_id + ) + + defer.returnValue({}) + + yield self._local_membership_update( + requester=requester, + target=target, + room_id=room_id, + membership=effective_membership_state, + txn_id=txn_id, ratelimit=ratelimit, - remote_room_hosts=remote_room_hosts, + prev_event_ids=latest_event_ids, ) @defer.inlineCallbacks @@ -211,73 +351,19 @@ class RoomMemberHandler(BaseHandler): if prev_event is not None: return - action = "send" - if event.membership == Membership.JOIN: if requester.is_guest and not self._can_guest_join(context.current_state): # This should be an auth check, but guests are a local concept, # so don't really fit into the general auth process. raise AuthError(403, "Guest access not allowed") - do_remote_join_dance, remote_room_hosts = self._should_do_dance( - context, - (self.get_inviter(event.state_key, context.current_state)), - remote_room_hosts, - ) - if do_remote_join_dance: - action = "remote_join" - elif event.membership == Membership.LEAVE: - is_host_in_room = self.is_host_in_room(context.current_state) - - if not is_host_in_room: - # perhaps we've been invited - inviter = self.get_inviter( - target_user.to_string(), context.current_state - ) - if not inviter: - raise SynapseError(404, "Not a known room") - if self.hs.is_mine(inviter): - # the inviter was on our server, but has now left. Carry on - # with the normal rejection codepath. - # - # This is a bit of a hack, because the room might still be - # active on other servers. - pass - else: - # send the rejection to the inviter's HS. - remote_room_hosts = remote_room_hosts + [inviter.domain] - action = "remote_reject" - - federation_handler = self.hs.get_handlers().federation_handler - - if action == "remote_join": - if len(remote_room_hosts) == 0: - raise SynapseError(404, "No known servers") - - # We don't do an auth check if we are doing an invite - # join dance for now, since we're kinda implicitly checking - # that we are allowed to join when we decide whether or not we - # need to do the invite/join dance. - yield federation_handler.do_invite_join( - remote_room_hosts, - event.room_id, - event.user_id, - event.content, - ) - elif action == "remote_reject": - yield federation_handler.do_remotely_reject_invite( - remote_room_hosts, - room_id, - event.user_id - ) - else: - yield self.handle_new_client_event( - requester, - event, - context, - extra_users=[target_user], - ratelimit=ratelimit, - ) + yield self.handle_new_client_event( + requester, + event, + context, + extra_users=[target_user], + ratelimit=ratelimit, + ) prev_member_event = context.current_state.get( (EventTypes.Member, target_user.to_string()), @@ -306,19 +392,6 @@ class RoomMemberHandler(BaseHandler): and guest_access.content["guest_access"] == "can_join" ) - def _should_do_dance(self, context, inviter, room_hosts=None): - # TODO: Shouldn't this be remote_room_host? - room_hosts = room_hosts or [] - - is_host_in_room = self.is_host_in_room(context.current_state) - if is_host_in_room: - return False, room_hosts - - if inviter and not self.hs.is_mine(inviter): - room_hosts.append(inviter.domain) - - return True, room_hosts - @defer.inlineCallbacks def lookup_room_alias(self, room_alias): """ @@ -344,11 +417,14 @@ class RoomMemberHandler(BaseHandler): defer.returnValue((RoomID.from_string(room_id), servers)) - def get_inviter(self, user_id, current_state): - prev_state = current_state.get((EventTypes.Member, user_id)) - if prev_state and prev_state.membership == Membership.INVITE: - return UserID.from_string(prev_state.user_id) - return None + @defer.inlineCallbacks + def get_inviter(self, user_id, room_id): + invite = yield self.store.get_invite_for_user_in_room( + user_id=user_id, + room_id=room_id, + ) + if invite: + defer.returnValue(UserID.from_string(invite.sender)) @defer.inlineCallbacks def get_joined_rooms_for_user(self, user): |