From c906f3066152ba7a65c0765a5812b71bb8a4016c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 1 Apr 2016 16:17:32 +0100 Subject: Do checks for memberships before creating events --- synapse/handlers/room_member.py | 237 ++++++++++++++++++++++++---------------- 1 file changed, 144 insertions(+), 93 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 5fdbd3adcc..09b8f6217a 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -95,6 +95,80 @@ class RoomMemberHandler(BaseHandler): if remotedomains is not None: remotedomains.add(member.domain) + @defer.inlineCallbacks + def _local_membership_update( + self, requester, target, room_id, membership, + txn_id=None, + ratelimit=True, + ): + msg_handler = self.hs.get_handlers().message_handler + + content = {"membership": membership} + if requester.is_guest: + content["kind"] = "guest" + + event, context = yield msg_handler.create_event( + { + "type": EventTypes.Member, + "content": content, + "room_id": room_id, + "sender": requester.user.to_string(), + "state_key": target.to_string(), + + # For backwards compatibility: + "membership": membership, + }, + token_id=requester.access_token_id, + txn_id=txn_id, + ) + + yield self.handle_new_client_event( + requester, + event, + context, + extra_users=[target], + ratelimit=ratelimit, + ) + + prev_member_event = context.current_state.get( + (EventTypes.Member, target.to_string()), + None + ) + + if event.membership == Membership.JOIN: + if not prev_member_event or prev_member_event.membership != Membership.JOIN: + # Only fire user_joined_room if the user has acutally joined the + # room. Don't bother if the user is just changing their profile + # info. + yield user_joined_room(self.distributor, target, room_id) + elif event.membership == Membership.LEAVE: + if prev_member_event and prev_member_event.membership == Membership.JOIN: + user_left_room(self.distributor, target, room_id) + + @defer.inlineCallbacks + def remote_join(self, remote_room_hosts, room_id, user, content): + if len(remote_room_hosts) == 0: + raise SynapseError(404, "No known servers") + + # We don't do an auth check if we are doing an invite + # join dance for now, since we're kinda implicitly checking + # that we are allowed to join when we decide whether or not we + # need to do the invite/join dance. + yield self.hs.get_handlers().federation_handler.do_invite_join( + remote_room_hosts, + room_id, + user.to_string(), + content, + ) + yield user_joined_room(self.distributor, user, room_id) + + def reject_remote_invite(self, user_id, room_id, remote_room_hosts): + return self.hs.get_handlers().federation_handler.do_remotely_reject_invite( + remote_room_hosts, + room_id, + user_id + ) + @defer.inlineCallbacks def update_membership( self, @@ -120,28 +194,15 @@ class RoomMemberHandler(BaseHandler): third_party_signed, ) - msg_handler = self.hs.get_handlers().message_handler - - content = {"membership": effective_membership_state} - if requester.is_guest: - content["kind"] = "guest" + if not remote_room_hosts: + remote_room_hosts = [] - event, context = yield msg_handler.create_event( - { - "type": EventTypes.Member, - "content": content, - "room_id": room_id, - "sender": requester.user.to_string(), - "state_key": target.to_string(), - - # For backwards compatibility: - "membership": effective_membership_state, - }, - token_id=requester.access_token_id, - txn_id=txn_id, + latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + current_state = yield self.state_handler.get_current_state( + room_id, latest_event_ids=latest_event_ids, ) - old_state = context.current_state.get((EventTypes.Member, event.state_key)) + old_state = current_state.get((EventTypes.Member, target.to_string())) old_membership = old_state.content.get("membership") if old_state else None if action == "unban" and old_membership != "ban": raise SynapseError( @@ -156,13 +217,57 @@ class RoomMemberHandler(BaseHandler): errcode=Codes.BAD_STATE ) - member_handler = self.hs.get_handlers().room_member_handler - yield member_handler.send_membership_event( - requester, - event, - context, + is_host_in_room = self.is_host_in_room(current_state) + + if effective_membership_state == Membership.JOIN: + if requester.is_guest and not self._can_guest_join(current_state): + # This should be an auth check, but guests are a local concept, + # so don't really fit into the general auth process. + raise AuthError(403, "Guest access not allowed") + + if not is_host_in_room: + inviter = yield self.get_inviter(target.to_string(), room_id) + if inviter and not self.hs.is_mine(inviter): + remote_room_hosts.append(inviter.domain) + + content = {"membership": Membership.JOIN} + if requester.is_guest: + content["kind"] = "guest" + + ret = yield self.remote_join( + remote_room_hosts, room_id, target, content + ) + defer.returnValue(ret) + + elif effective_membership_state == Membership.LEAVE: + if not is_host_in_room: + # perhaps we've been invited + inviter = yield self.get_inviter(target.to_string(), room_id) + if not inviter: + raise SynapseError(404, "Not a known room") + + if self.hs.is_mine(inviter): + # the inviter was on our server, but has now left. Carry on + # with the normal rejection codepath. + # + # This is a bit of a hack, because the room might still be + # active on other servers. + pass + else: + # send the rejection to the inviter's HS. + remote_room_hosts = remote_room_hosts + [inviter.domain] + ret = yield self.reject_remote_invite( + target.to_string(), room_id, remote_room_hosts + ) + defer.returnValue(ret) + + yield self._local_membership_update( + requester=requester, + target=target, + room_id=room_id, + membership=effective_membership_state, + txn_id=txn_id, ratelimit=ratelimit, - remote_room_hosts=remote_room_hosts, ) @defer.inlineCallbacks @@ -211,73 +316,19 @@ class RoomMemberHandler(BaseHandler): if prev_event is not None: return - action = "send" - if event.membership == Membership.JOIN: if requester.is_guest and not self._can_guest_join(context.current_state): # This should be an auth check, but guests are a local concept, # so don't really fit into the general auth process. raise AuthError(403, "Guest access not allowed") - do_remote_join_dance, remote_room_hosts = self._should_do_dance( - context, - (self.get_inviter(event.state_key, context.current_state)), - remote_room_hosts, - ) - if do_remote_join_dance: - action = "remote_join" - elif event.membership == Membership.LEAVE: - is_host_in_room = self.is_host_in_room(context.current_state) - - if not is_host_in_room: - # perhaps we've been invited - inviter = self.get_inviter( - target_user.to_string(), context.current_state - ) - if not inviter: - raise SynapseError(404, "Not a known room") - if self.hs.is_mine(inviter): - # the inviter was on our server, but has now left. Carry on - # with the normal rejection codepath. - # - # This is a bit of a hack, because the room might still be - # active on other servers. - pass - else: - # send the rejection to the inviter's HS. - remote_room_hosts = remote_room_hosts + [inviter.domain] - action = "remote_reject" - - federation_handler = self.hs.get_handlers().federation_handler - - if action == "remote_join": - if len(remote_room_hosts) == 0: - raise SynapseError(404, "No known servers") - - # We don't do an auth check if we are doing an invite - # join dance for now, since we're kinda implicitly checking - # that we are allowed to join when we decide whether or not we - # need to do the invite/join dance. - yield federation_handler.do_invite_join( - remote_room_hosts, - event.room_id, - event.user_id, - event.content, - ) - elif action == "remote_reject": - yield federation_handler.do_remotely_reject_invite( - remote_room_hosts, - room_id, - event.user_id - ) - else: - yield self.handle_new_client_event( - requester, - event, - context, - extra_users=[target_user], - ratelimit=ratelimit, - ) + yield self.handle_new_client_event( + requester, + event, + context, + extra_users=[target_user], + ratelimit=ratelimit, + ) prev_member_event = context.current_state.get( (EventTypes.Member, target_user.to_string()), @@ -306,11 +357,11 @@ class RoomMemberHandler(BaseHandler): and guest_access.content["guest_access"] == "can_join" ) - def _should_do_dance(self, context, inviter, room_hosts=None): + def _should_do_dance(self, current_state, inviter, room_hosts=None): # TODO: Shouldn't this be remote_room_host? room_hosts = room_hosts or [] - is_host_in_room = self.is_host_in_room(context.current_state) + is_host_in_room = self.is_host_in_room(current_state) if is_host_in_room: return False, room_hosts @@ -344,11 +395,11 @@ class RoomMemberHandler(BaseHandler): defer.returnValue((RoomID.from_string(room_id), servers)) - def get_inviter(self, user_id, current_state): - prev_state = current_state.get((EventTypes.Member, user_id)) - if prev_state and prev_state.membership == Membership.INVITE: - return UserID.from_string(prev_state.user_id) - return None + @defer.inlineCallbacks + def get_inviter(self, user_id, room_id): + invite = yield self.store.get_room_member(user_id=user_id, room_id=room_id) + if invite: + defer.returnValue(UserID.from_string(invite.sender)) @defer.inlineCallbacks def get_joined_rooms_for_user(self, user): -- cgit 1.5.1 From aa82cb38e97faa4ad1c15109cbc5b02647ea2461 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 1 Apr 2016 16:36:54 +0100 Subject: Remove state hack from _create_new_client_event --- synapse/handlers/_base.py | 43 ------------------------------------------- 1 file changed, 43 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index d407eaeee9..2d8ff79a9e 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -221,49 +221,6 @@ class BaseHandler(object): context = yield state_handler.compute_event_context(builder) - # If we've received an invite over federation, there are no latest - # events in the room, because we don't know enough about the graph - # fragment we received to treat it like a graph, so the above returned - # no relevant events. It may have returned some events (if we have - # joined and left the room), but not useful ones, like the invite. - if ( - not self.is_host_in_room(context.current_state) and - builder.type == EventTypes.Member - ): - prev_member_event = yield self.store.get_room_member( - builder.sender, builder.room_id - ) - - # The prev_member_event may already be in context.current_state, - # despite us not being present in the room; in particular, if - # inviting user, and all other local users, have already left. - # - # In that case, we have all the information we need, and we don't - # want to drop "context" - not least because we may need to handle - # the invite locally, which will require us to have the whole - # context (not just prev_member_event) to auth it. - # - context_event_ids = ( - e.event_id for e in context.current_state.values() - ) - - if ( - prev_member_event and - prev_member_event.event_id not in context_event_ids - ): - # The prev_member_event is missing from context, so it must - # have arrived over federation and is an outlier. We forcibly - # set our context to the invite we received over federation - builder.prev_events = ( - prev_member_event.event_id, - prev_member_event.prev_events - ) - - context = yield state_handler.compute_event_context( - builder, - old_state=(prev_member_event,) - ) - if builder.is_state(): builder.prev_state = yield self.store.add_event_hashes( context.prev_state_events -- cgit 1.5.1 From d76d89323c4b962b4f3ff72a3c9b40f2d2d347b3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 1 Apr 2016 17:39:32 +0100 Subject: Use computed prev event ids --- synapse/handlers/_base.py | 29 +++++++++++++++++------------ synapse/handlers/message.py | 6 +++++- synapse/handlers/room_member.py | 3 +++ synapse/storage/event_federation.py | 16 ++++++++++++++++ 4 files changed, 41 insertions(+), 13 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 2d8ff79a9e..242afa1564 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -199,20 +199,25 @@ class BaseHandler(object): ) @defer.inlineCallbacks - def _create_new_client_event(self, builder): - latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room( - builder.room_id, - ) - - if latest_ret: - depth = max([d for _, _, d in latest_ret]) + 1 + def _create_new_client_event(self, builder, prev_event_ids=None): + if prev_event_ids: + prev_events = yield self.store.add_event_hashes(prev_event_ids) + prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) + depth = prev_max_depth + 1 else: - depth = 1 + latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room( + builder.room_id, + ) - prev_events = [ - (event_id, prev_hashes) - for event_id, prev_hashes, _ in latest_ret - ] + if latest_ret: + depth = max([d for _, _, d in latest_ret]) + 1 + else: + depth = 1 + + prev_events = [ + (event_id, prev_hashes) + for event_id, prev_hashes, _ in latest_ret + ] builder.prev_events = prev_events builder.depth = depth diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 0bb111d047..10608c0dd9 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -176,7 +176,7 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) @defer.inlineCallbacks - def create_event(self, event_dict, token_id=None, txn_id=None): + def create_event(self, event_dict, token_id=None, txn_id=None, prev_event_ids=None): """ Given a dict from a client, create a new event. @@ -187,6 +187,9 @@ class MessageHandler(BaseHandler): Args: event_dict (dict): An entire event + token_id (str) + txn_id (str) + prev_event_ids (list): The prev event ids to use when creating the event Returns: Tuple of created event (FrozenEvent), Context @@ -225,6 +228,7 @@ class MessageHandler(BaseHandler): event, context = yield self._create_new_client_event( builder=builder, + prev_event_ids=prev_event_ids, ) defer.returnValue((event, context)) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 09b8f6217a..7c1bb8cfe4 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -98,6 +98,7 @@ class RoomMemberHandler(BaseHandler): @defer.inlineCallbacks def _local_membership_update( self, requester, target, room_id, membership, + prev_event_ids, txn_id=None, ratelimit=True, ): @@ -120,6 +121,7 @@ class RoomMemberHandler(BaseHandler): }, token_id=requester.access_token_id, txn_id=txn_id, + prev_event_ids=prev_event_ids, ) yield self.handle_new_client_event( @@ -268,6 +270,7 @@ class RoomMemberHandler(BaseHandler): membership=effective_membership_state, txn_id=txn_id, ratelimit=ratelimit, + prev_event_ids=latest_event_ids, ) @defer.inlineCallbacks diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 3489315e0d..0827946207 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -163,6 +163,22 @@ class EventFederationStore(SQLBaseStore): room_id, ) + @defer.inlineCallbacks + def get_max_depth_of_events(self, event_ids): + sql = ( + "SELECT MAX(depth) FROM events WHERE event_id IN (%s)" + ) % (",".join(["?"] * len(event_ids)),) + + rows = yield self._execute( + "get_max_depth_of_events", None, + sql, *event_ids + ) + + if rows: + defer.returnValue(rows[0][0]) + else: + defer.returnValue(1) + def _get_min_depth_interaction(self, txn, room_id): min_depth = self._simple_select_one_onecol_txn( txn, -- cgit 1.5.1 From 3d76b7cb2ba05fbf17be0a6647f39c419f428c16 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 4 Apr 2016 15:52:01 +0100 Subject: Store invites in a separate table. --- synapse/handlers/room_member.py | 2 +- synapse/storage/events.py | 13 +--- synapse/storage/prepare_database.py | 2 +- synapse/storage/roommember.py | 111 ++++++++++++++++++++++------ synapse/storage/schema/delta/31/invites.sql | 28 +++++++ 5 files changed, 124 insertions(+), 32 deletions(-) create mode 100644 synapse/storage/schema/delta/31/invites.sql (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 7c1bb8cfe4..98e346d48e 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -400,7 +400,7 @@ class RoomMemberHandler(BaseHandler): @defer.inlineCallbacks def get_inviter(self, user_id, room_id): - invite = yield self.store.get_room_member(user_id=user_id, room_id=room_id) + invite = yield self.store.get_inviter(user_id=user_id, room_id=room_id) if invite: defer.returnValue(UserID.from_string(invite.sender)) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c4dc3b3d51..5d299a1132 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -367,7 +367,8 @@ class EventsStore(SQLBaseStore): event for event, _ in events_and_contexts if event.type == EventTypes.Member - ] + ], + backfilled=backfilled, ) def event_dict(event): @@ -485,14 +486,8 @@ class EventsStore(SQLBaseStore): return for event, _ in state_events_and_contexts: - if (not event.internal_metadata.is_invite_from_remote() - and event.internal_metadata.is_outlier()): - # Outlier events generally shouldn't clobber the current state. - # However invites from remote severs for rooms we aren't in - # are a bit special: they don't come with any associated - # state so are technically an outlier, however all the - # client-facing code assumes that they are in the current - # state table so we insert the event anyway. + if event.internal_metadata.is_outlier(): + # Outlier events shouldn't clobber the current state. continue if context.rejected: diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 3f29aad1e8..4099387ba7 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 30 +SCHEMA_VERSION = 31 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 430b49c12e..4c026b33ae 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -36,7 +36,7 @@ RoomsForUser = namedtuple( class RoomMemberStore(SQLBaseStore): - def _store_room_members_txn(self, txn, events): + def _store_room_members_txn(self, txn, events, backfilled): """Store a room member in the database. """ self._simple_insert_many_txn( @@ -62,6 +62,41 @@ class RoomMemberStore(SQLBaseStore): self._membership_stream_cache.entity_has_changed, event.state_key, event.internal_metadata.stream_ordering ) + txn.call_after( + self.get_invited_rooms_for_user.invalidate, (event.state_key,) + ) + + is_mine = self.hs.is_mine_id(event.state_key) + is_new_state = not backfilled and ( + not event.internal_metadata.is_outlier() + or event.internal_metadata.is_invite_from_remote() + ) + if is_new_state and is_mine: + if event.membership == Membership.INVITE: + self._simple_insert_txn( + txn, + table="invites", + values={ + "event_id": event.event_id, + "invitee": event.state_key, + "inviter": event.sender, + "room_id": event.room_id, + "stream_id": event.internal_metadata.stream_ordering, + } + ) + else: + sql = ( + "UPDATE invites SET stream_id = ?, replaced_by = ? WHERE" + " room_id = ? AND invitee = ? AND locally_rejected is NULL" + " AND replaced_by is NULL" + ) + + txn.execute(sql, ( + event.internal_metadata.stream_ordering, + event.event_id, + event.room_id, + event.state_key, + )) def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. @@ -127,6 +162,14 @@ class RoomMemberStore(SQLBaseStore): user_id, [Membership.INVITE] ) + @defer.inlineCallbacks + def get_inviter(self, user_id, room_id): + invites = yield self.get_invited_rooms_for_user(user_id) + for invite in invites: + if invite.room_id == room_id: + defer.returnValue(invite) + defer.returnValue(None) + def get_leave_and_ban_events_for_user(self, user_id): """ Get all the leave events for a user Args: @@ -163,29 +206,55 @@ class RoomMemberStore(SQLBaseStore): def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id, membership_list): - where_clause = "user_id = ? AND (%s) AND forgotten = 0" % ( - " OR ".join(["membership = ?" for _ in membership_list]), - ) - args = [user_id] - args.extend(membership_list) + do_invite = Membership.INVITE in membership_list + membership_list = [m for m in membership_list if m != Membership.INVITE] - sql = ( - "SELECT m.room_id, m.sender, m.membership, m.event_id, e.stream_ordering" - " FROM current_state_events as c" - " INNER JOIN room_memberships as m" - " ON m.event_id = c.event_id" - " INNER JOIN events as e" - " ON e.event_id = c.event_id" - " AND m.room_id = c.room_id" - " AND m.user_id = c.state_key" - " WHERE %s" - ) % (where_clause,) + results = [] + if membership_list: + where_clause = "user_id = ? AND (%s) AND forgotten = 0" % ( + " OR ".join(["membership = ?" for _ in membership_list]), + ) + + args = [user_id] + args.extend(membership_list) + + sql = ( + "SELECT m.room_id, m.sender, m.membership, m.event_id, e.stream_ordering" + " FROM current_state_events as c" + " INNER JOIN room_memberships as m" + " ON m.event_id = c.event_id" + " INNER JOIN events as e" + " ON e.event_id = c.event_id" + " AND m.room_id = c.room_id" + " AND m.user_id = c.state_key" + " WHERE %s" + ) % (where_clause,) + + txn.execute(sql, args) + results = [ + RoomsForUser(**r) for r in self.cursor_to_dict(txn) + ] + + if do_invite: + sql = ( + "SELECT i.room_id, inviter, i.event_id, e.stream_ordering" + " FROM invites as i" + " INNER JOIN events as e USING (event_id)" + " WHERE invitee = ? AND locally_rejected is NULL" + " AND replaced_by is NULL" + ) + + txn.execute(sql, (user_id,)) + results.extend(RoomsForUser( + room_id=r["room_id"], + sender=r["inviter"], + event_id=r["event_id"], + stream_ordering=r["stream_ordering"], + membership=Membership.INVITE, + ) for r in self.cursor_to_dict(txn)) - txn.execute(sql, args) - return [ - RoomsForUser(**r) for r in self.cursor_to_dict(txn) - ] + return results @cached(max_entries=5000) def get_joined_hosts_for_room(self, room_id): diff --git a/synapse/storage/schema/delta/31/invites.sql b/synapse/storage/schema/delta/31/invites.sql new file mode 100644 index 0000000000..4f6fb9ea63 --- /dev/null +++ b/synapse/storage/schema/delta/31/invites.sql @@ -0,0 +1,28 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE invites( + stream_id BIGINT NOT NULL, + inviter TEXT NOT NULL, + invitee TEXT NOT NULL, + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + locally_rejected TEXT, + replaced_by TEXT +); + +CREATE INDEX invites_id ON invites(stream_id); +CREATE INDEX invites_for_user_idx ON invites(invitee, locally_rejected, replaced_by, room_id); -- cgit 1.5.1 From 0c53d750e7145f57ed97c544efdb846cc9e37b67 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 4 Apr 2016 18:02:48 +0100 Subject: Docs and indents --- synapse/handlers/room_member.py | 5 ++++- synapse/storage/roommember.py | 18 ++++++++++++++++-- synapse/storage/schema/delta/31/invites.sql | 22 +++++++++++----------- 3 files changed, 31 insertions(+), 14 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 98e346d48e..f1c3e90ecd 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -400,7 +400,10 @@ class RoomMemberHandler(BaseHandler): @defer.inlineCallbacks def get_inviter(self, user_id, room_id): - invite = yield self.store.get_inviter(user_id=user_id, room_id=room_id) + invite = yield self.store.get_invite_for_user_in_room( + user_id=user_id, + room_id=room_id, + ) if invite: defer.returnValue(UserID.from_string(invite.sender)) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index abe5942744..36456a75fc 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -66,11 +66,15 @@ class RoomMemberStore(SQLBaseStore): self.get_invited_rooms_for_user.invalidate, (event.state_key,) ) - is_mine = self.hs.is_mine_id(event.state_key) + # We update the local_invites table only if the event is "current", + # i.e., its something that has just happened. + # The only current event that can also be an outlier is if its an + # invite that has come in across federation. is_new_state = not backfilled and ( not event.internal_metadata.is_outlier() or event.internal_metadata.is_invite_from_remote() ) + is_mine = self.hs.is_mine_id(event.state_key) if is_new_state and is_mine: if event.membership == Membership.INVITE: self._simple_insert_txn( @@ -163,7 +167,17 @@ class RoomMemberStore(SQLBaseStore): ) @defer.inlineCallbacks - def get_inviter(self, user_id, room_id): + def get_invite_for_user_in_room(self, user_id, room_id): + """Gets the invite for the given user and room + + Args: + user_id (str) + room_id (str) + + Returns: + Deferred: Resolves to either a RoomsForUser or None if no invite was + found. + """ invites = yield self.get_invited_rooms_for_user(user_id) for invite in invites: if invite.room_id == room_id: diff --git a/synapse/storage/schema/delta/31/invites.sql b/synapse/storage/schema/delta/31/invites.sql index 1c83430da4..2c57846d5a 100644 --- a/synapse/storage/schema/delta/31/invites.sql +++ b/synapse/storage/schema/delta/31/invites.sql @@ -26,17 +26,17 @@ CREATE TABLE local_invites( -- Insert all invites for local users into new `invites` table INSERT INTO local_invites SELECT - stream_ordering as stream_id, - sender as inviter, - state_key as invitee, - event_id, - room_id, - NULL as locally_rejected, - NULL as replaced_by -FROM events -NATURAL JOIN current_state_events -NATURAL JOIN room_memberships -WHERE membership = 'invite' AND state_key IN (SELECT name FROM users); + stream_ordering as stream_id, + sender as inviter, + state_key as invitee, + event_id, + room_id, + NULL as locally_rejected, + NULL as replaced_by + FROM events + NATURAL JOIN current_state_events + NATURAL JOIN room_memberships + WHERE membership = 'invite' AND state_key IN (SELECT name FROM users); CREATE INDEX local_invites_id ON local_invites(stream_id); CREATE INDEX local_invites_for_user_idx ON local_invites(invitee, locally_rejected, replaced_by, room_id); -- cgit 1.5.1 From df727f212606b771b1410c8e322fb8a99d159de4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Apr 2016 11:13:24 +0100 Subject: Fix stuck invites If rejecting a remote invite fails with an error response don't fail the entire request; instead mark the invite as locally rejected. This fixes the bug where users can get stuck invites which they can neither accept nor reject. --- synapse/handlers/federation.py | 34 +++++++++++++++++++++++----------- synapse/handlers/room_member.py | 18 ++++++++++++++---- synapse/storage/__init__.py | 3 ++- synapse/storage/roommember.py | 19 +++++++++++++++++++ 4 files changed, 58 insertions(+), 16 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 4049c01d26..19769eecd7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -784,13 +784,19 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def do_remotely_reject_invite(self, target_hosts, room_id, user_id): - origin, event = yield self._make_and_verify_event( - target_hosts, - room_id, - user_id, - "leave" - ) - signed_event = self._sign_event(event) + try: + origin, event = yield self._make_and_verify_event( + target_hosts, + room_id, + user_id, + "leave" + ) + signed_event = self._sign_event(event) + except SynapseError: + raise + except CodeMessageException as e: + logger.warn("Failed to reject invite: %s", e) + raise SynapseError(500, "Failed to reject invite") # Try the host we successfully got a response to /make_join/ # request first. @@ -800,10 +806,16 @@ class FederationHandler(BaseHandler): except ValueError: pass - yield self.replication_layer.send_leave( - target_hosts, - signed_event - ) + try: + yield self.replication_layer.send_leave( + target_hosts, + signed_event + ) + except SynapseError: + raise + except CodeMessageException as e: + logger.warn("Failed to reject invite: %s", e) + raise SynapseError(500, "Failed to reject invite") context = yield self.state_handler.compute_event_context(event) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index f1c3e90ecd..6c7409215a 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -258,10 +258,20 @@ class RoomMemberHandler(BaseHandler): else: # send the rejection to the inviter's HS. remote_room_hosts = remote_room_hosts + [inviter.domain] - ret = yield self.reject_remote_invite( - target.to_string(), room_id, remote_room_hosts - ) - defer.returnValue(ret) + + try: + ret = yield self.reject_remote_invite( + target.to_string(), room_id, remote_room_hosts + ) + defer.returnValue(ret) + except SynapseError as e: + logger.warn("Failed to reject invite: %s", e) + + yield self.store.locally_reject_invite( + target.to_string(), room_id + ) + + defer.returnValue({}) yield self._local_membership_update( requester=requester, diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 57863bba4d..07916b292d 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -94,7 +94,8 @@ class DataStore(RoomMemberStore, RoomStore, ) self._stream_id_gen = StreamIdGenerator( - db_conn, "events", "stream_ordering" + db_conn, "events", "stream_ordering", + extra_tables=[("local_invites", "stream_id")] ) self._backfill_id_gen = StreamIdGenerator( db_conn, "events", "stream_ordering", step=-1 diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 36456a75fc..66e7a40e3c 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -102,6 +102,25 @@ class RoomMemberStore(SQLBaseStore): event.state_key, )) + @defer.inlineCallbacks + def locally_reject_invite(self, user_id, room_id): + sql = ( + "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE" + " room_id = ? AND invitee = ? AND locally_rejected is NULL" + " AND replaced_by is NULL" + ) + + def f(txn, stream_ordering): + txn.execute(sql, ( + stream_ordering, + True, + room_id, + user_id, + )) + + with self._stream_id_gen.get_next() as stream_ordering: + yield self.runInteraction("locally_reject_invite", f, stream_ordering) + def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. -- cgit 1.5.1 From 6222ae51cee230fc746d0706db13d8928f28234b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Apr 2016 12:56:29 +0100 Subject: Don't backfill from self --- synapse/handlers/federation.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 267fedf114..edffa560bf 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -289,6 +289,9 @@ class FederationHandler(BaseHandler): def backfill(self, dest, room_id, limit, extremities=[]): """ Trigger a backfill request to `dest` for the given `room_id` """ + if dest == self.server_name: + raise SynapseError(400, "Can't backfill from self.") + if not extremities: extremities = yield self.store.get_oldest_events_in_room(room_id) @@ -455,7 +458,7 @@ class FederationHandler(BaseHandler): likely_domains = [ domain for domain, depth in curr_domains - if domain is not self.server_name + if domain != self.server_name ] @defer.inlineCallbacks -- cgit 1.5.1 From 1e05637e37f62445d84e43ae89e441f1833a32e2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 6 Apr 2016 15:19:45 +0100 Subject: Let users see their own leave events ... otherwise clients get confused. Fixes https://matrix.org/jira/browse/SYN-662, https://github.com/vector-im/vector-web/issues/368 --- synapse/handlers/_base.py | 51 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 11 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index c77afe7f51..88d8b9ba54 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -37,6 +37,15 @@ VISIBILITY_PRIORITY = ( ) +MEMBERSHIP_PRIORITY = ( + Membership.JOIN, + Membership.INVITE, + Membership.KNOCK, + Membership.LEAVE, + Membership.BAN, +) + + class BaseHandler(object): """ Common base class for the event handlers. @@ -72,6 +81,7 @@ class BaseHandler(object): * the user is not currently a member of the room, and: * the user has not been a member of the room since the given events + events ([synapse.events.EventBase]): list of events to filter """ forgotten = yield defer.gatherResults([ self.store.who_forgot_in_room( @@ -86,6 +96,12 @@ class BaseHandler(object): ) def allowed(event, user_id, is_peeking): + """ + Args: + event (synapse.events.EventBase): event to check + user_id (str) + is_peeking (bool) + """ state = event_id_to_state[event.event_id] # get the room_visibility at the time of the event. @@ -117,17 +133,30 @@ class BaseHandler(object): if old_priority < new_priority: visibility = prev_visibility - # get the user's membership at the time of the event. (or rather, - # just *after* the event. Which means that people can see their - # own join events, but not (currently) their own leave events.) - membership_event = state.get((EventTypes.Member, user_id), None) - if membership_event: - if membership_event.event_id in event_id_forgotten: - membership = None - else: - membership = membership_event.membership - else: - membership = None + # likewise, if the event is the user's own membership event, use + # the 'most joined' membership + membership = None + if event.type == EventTypes.Member and event.state_key == user_id: + membership = event.content.get("membership", None) + if membership not in MEMBERSHIP_PRIORITY: + membership = "leave" + + prev_content = event.unsigned.get("prev_content", {}) + prev_membership = prev_content.get("membership", None) + if prev_membership not in MEMBERSHIP_PRIORITY: + prev_membership = "leave" + + new_priority = MEMBERSHIP_PRIORITY.index(membership) + old_priority = MEMBERSHIP_PRIORITY.index(prev_membership) + if old_priority < new_priority: + membership = prev_membership + + # otherwise, get the user's membership at the time of the event. + if membership is None: + membership_event = state.get((EventTypes.Member, user_id), None) + if membership_event: + if membership_event.event_id not in event_id_forgotten: + membership = membership_event.membership # if the user was a member of the room at the time of the event, # they can see it. -- cgit 1.5.1 From 1ef036567051218c38de5529472d3c9000c6960d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 7 Apr 2016 09:42:52 +0100 Subject: Set profile information when joining rooms remotely --- synapse/handlers/room_member.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index fe2315df8f..8c41cb6f3c 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -233,6 +233,11 @@ class RoomMemberHandler(BaseHandler): remote_room_hosts.append(inviter.domain) content = {"membership": Membership.JOIN} + + profile = self.hs.get_handlers().profile_handler + content["displayname"] = yield profile.get_displayname(target) + content["avatar_url"] = yield profile.get_avatar_url(target) + if requester.is_guest: content["kind"] = "guest" -- cgit 1.5.1 From af03ecf35223f93971596f38393c62f4694705fa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 Apr 2016 15:44:22 +0100 Subject: Deduplicate joins --- synapse/handlers/room_member.py | 31 ++++++++++++++++++++++++ synapse/util/async.py | 42 +++++++++++++++++++++++++++++++++ synapse/util/caches/response_cache.py | 2 +- tests/util/test_linearizer.py | 44 +++++++++++++++++++++++++++++++++++ 4 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 tests/util/test_linearizer.py (limited to 'synapse/handlers') diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index fe2315df8f..0fcc9445a8 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -24,6 +24,7 @@ from synapse.api.constants import ( ) from synapse.api.errors import AuthError, SynapseError, Codes from synapse.util.logcontext import preserve_context_over_fn +from synapse.util.async import Linearizer from signedjson.sign import verify_signed_json from signedjson.key import decode_verify_key_bytes @@ -60,6 +61,8 @@ class RoomMemberHandler(BaseHandler): def __init__(self, hs): super(RoomMemberHandler, self).__init__(hs) + self.member_linearizer = Linearizer() + self.clock = hs.get_clock() self.distributor = hs.get_distributor() @@ -182,6 +185,34 @@ class RoomMemberHandler(BaseHandler): remote_room_hosts=None, third_party_signed=None, ratelimit=True, + ): + key = (target, room_id,) + + with (yield self.member_linearizer.queue(key)): + result = yield self._update_membership( + requester, + target, + room_id, + action, + txn_id=txn_id, + remote_room_hosts=remote_room_hosts, + third_party_signed=third_party_signed, + ratelimit=ratelimit, + ) + + defer.returnValue(result) + + @defer.inlineCallbacks + def _update_membership( + self, + requester, + target, + room_id, + action, + txn_id=None, + remote_room_hosts=None, + third_party_signed=None, + ratelimit=True, ): effective_membership_state = action if action in ["kick", "unban"]: diff --git a/synapse/util/async.py b/synapse/util/async.py index cd4d90f3cf..408c86be91 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -19,6 +19,8 @@ from twisted.internet import defer, reactor from .logcontext import PreserveLoggingContext, preserve_fn from synapse.util import unwrapFirstError +from contextlib import contextmanager + @defer.inlineCallbacks def sleep(seconds): @@ -137,3 +139,43 @@ def concurrently_execute(func, args, limit): preserve_fn(_concurrently_execute_inner)() for _ in xrange(limit) ], consumeErrors=True).addErrback(unwrapFirstError) + + +@contextmanager +def _trigger_defer_manager(d): + try: + yield + finally: + d.callback(None) + + +class Linearizer(object): + """Linearizes access to resources based on a key. Useful to ensure only one + thing is happening at a time on a given resource. + + Example: + + with (yield linearizer.queue("test_key")): + # do some work. + + """ + def __init__(self): + self.key_to_defer = {} + + @defer.inlineCallbacks + def queue(self, key): + current_defer = self.key_to_defer.get(key) + + new_defer = defer.Deferred() + self.key_to_defer[key] = new_defer + + def remove_if_current(_): + d = self.key_to_defer.get(key) + if d is new_defer: + self.key_to_defer.pop(key, None) + + new_defer.addBoth(remove_if_current) + + yield current_defer + + defer.returnValue(_trigger_defer_manager(new_defer)) diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index be310ba320..36686b479e 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -35,7 +35,7 @@ class ResponseCache(object): return None def set(self, key, deferred): - result = ObservableDeferred(deferred) + result = ObservableDeferred(deferred, consumeErrors=True) self.pending_result_cache[key] = result def remove(r): diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py new file mode 100644 index 0000000000..afcba482f9 --- /dev/null +++ b/tests/util/test_linearizer.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from tests import unittest + +from twisted.internet import defer + +from synapse.util.async import Linearizer + + +class LinearizerTestCase(unittest.TestCase): + + @defer.inlineCallbacks + def test_linearizer(self): + linearizer = Linearizer() + + key = object() + + d1 = linearizer.queue(key) + cm1 = yield d1 + + d2 = linearizer.queue(key) + self.assertFalse(d2.called) + + with cm1: + self.assertFalse(d2.called) + + self.assertTrue(d2.called) + + with (yield d2): + pass -- cgit 1.5.1