diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/device.py | 68 | ||||
-rw-r--r-- | synapse/handlers/directory.py | 15 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 99 | ||||
-rw-r--r-- | synapse/handlers/message.py | 54 | ||||
-rw-r--r-- | synapse/handlers/room.py | 8 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 37 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 155 |
7 files changed, 372 insertions, 64 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index ed60d494ff..dac4b3f4e0 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -270,6 +270,8 @@ class DeviceHandler(BaseHandler): user_id (str) from_token (StreamToken) """ + now_token = yield self.hs.get_event_sources().get_current_token() + room_ids = yield self.store.get_rooms_for_user(user_id) # First we check if any devices have changed @@ -280,11 +282,30 @@ class DeviceHandler(BaseHandler): # Then work out if any users have since joined rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key) + member_events = yield self.store.get_membership_changes_for_user( + user_id, from_token.room_key, now_token.room_key + ) + rooms_changed.update(event.room_id for event in member_events) + stream_ordering = RoomStreamToken.parse_stream_token( - from_token.room_key).stream + from_token.room_key + ).stream possibly_changed = set(changed) + possibly_left = set() for room_id in rooms_changed: + current_state_ids = yield self.store.get_current_state_ids(room_id) + + # The user may have left the room + # TODO: Check if they actually did or if we were just invited. + if room_id not in room_ids: + for key, event_id in current_state_ids.iteritems(): + etype, state_key = key + if etype != EventTypes.Member: + continue + possibly_left.add(state_key) + continue + # Fetch the current state at the time. try: event_ids = yield self.store.get_forward_extremeties_for_room( @@ -295,8 +316,6 @@ class DeviceHandler(BaseHandler): # ordering: treat it the same as a new room event_ids = [] - current_state_ids = yield self.store.get_current_state_ids(room_id) - # special-case for an empty prev state: include all members # in the changed list if not event_ids: @@ -307,9 +326,25 @@ class DeviceHandler(BaseHandler): possibly_changed.add(state_key) continue + current_member_id = current_state_ids.get((EventTypes.Member, user_id)) + if not current_member_id: + continue + # mapping from event_id -> state_dict prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) + # Check if we've joined the room? If so we just blindly add all the users to + # the "possibly changed" users. + for state_dict in prev_state_ids.itervalues(): + member_event = state_dict.get((EventTypes.Member, user_id), None) + if not member_event or member_event != current_member_id: + for key, event_id in current_state_ids.iteritems(): + etype, state_key = key + if etype != EventTypes.Member: + continue + possibly_changed.add(state_key) + break + # If there has been any change in membership, include them in the # possibly changed list. We'll check if they are joined below, # and we're not toooo worried about spuriously adding users. @@ -320,19 +355,30 @@ class DeviceHandler(BaseHandler): # check if this member has changed since any of the extremities # at the stream_ordering, and add them to the list if so. - for state_dict in prev_state_ids.values(): + for state_dict in prev_state_ids.itervalues(): prev_event_id = state_dict.get(key, None) if not prev_event_id or prev_event_id != event_id: - possibly_changed.add(state_key) + if state_key != user_id: + possibly_changed.add(state_key) break - users_who_share_room = yield self.store.get_users_who_share_room_with_user( - user_id - ) + if possibly_changed or possibly_left: + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) - # Take the intersection of the users whose devices may have changed - # and those that actually still share a room with the user - defer.returnValue(users_who_share_room & possibly_changed) + # Take the intersection of the users whose devices may have changed + # and those that actually still share a room with the user + possibly_joined = possibly_changed & users_who_share_room + possibly_left = (possibly_changed | possibly_left) - users_who_share_room + else: + possibly_joined = [] + possibly_left = [] + + defer.returnValue({ + "changed": list(possibly_joined), + "left": list(possibly_left), + }) @defer.inlineCallbacks def on_federation_query_user_devices(self, user_id): diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 943554ce98..a0464ae5c0 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -40,6 +40,8 @@ class DirectoryHandler(BaseHandler): "directory", self.on_directory_query ) + self.spam_checker = hs.get_spam_checker() + @defer.inlineCallbacks def _create_association(self, room_alias, room_id, servers=None, creator=None): # general association creation for both human users and app services @@ -73,6 +75,11 @@ class DirectoryHandler(BaseHandler): # association creation for human users # TODO(erikj): Do user auth. + if not self.spam_checker.user_may_create_room_alias(user_id, room_alias): + raise SynapseError( + 403, "This user is not permitted to create this alias", + ) + can_create = yield self.can_modify_alias( room_alias, user_id=user_id @@ -327,6 +334,14 @@ class DirectoryHandler(BaseHandler): room_id (str) visibility (str): "public" or "private" """ + if not self.spam_checker.user_may_publish_room( + requester.user.to_string(), room_id + ): + raise AuthError( + 403, + "This user is not permitted to publish rooms to the room list" + ) + if requester.is_guest: raise AuthError(403, "Guests cannot edit the published room list") diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 694b820d85..7711cded01 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -14,7 +14,6 @@ # limitations under the License. """Contains handlers for federation events.""" -import synapse.util.logcontext from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json from unpaddedbase64 import decode_base64 @@ -26,10 +25,7 @@ from synapse.api.errors import ( ) from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.events.validator import EventValidator -from synapse.util import unwrapFirstError -from synapse.util.logcontext import ( - preserve_fn, preserve_context_over_deferred -) +from synapse.util import unwrapFirstError, logcontext from synapse.util.metrics import measure_func from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor, Linearizer @@ -77,6 +73,7 @@ class FederationHandler(BaseHandler): self.action_generator = hs.get_action_generator() self.is_mine_id = hs.is_mine_id self.pusher_pool = hs.get_pusherpool() + self.spam_checker = hs.get_spam_checker() self.replication_layer.set_handler(self) @@ -125,6 +122,28 @@ class FederationHandler(BaseHandler): self.room_queues[pdu.room_id].append((pdu, origin)) return + # If we're no longer in the room just ditch the event entirely. This + # is probably an old server that has come back and thinks we're still + # in the room (or we've been rejoined to the room by a state reset). + # + # If we were never in the room then maybe our database got vaped and + # we should check if we *are* in fact in the room. If we are then we + # can magically rejoin the room. + is_in_room = yield self.auth.check_host_in_room( + pdu.room_id, + self.server_name + ) + if not is_in_room: + was_in_room = yield self.store.was_host_joined( + pdu.room_id, self.server_name, + ) + if was_in_room: + logger.info( + "Ignoring PDU %s for room %s from %s as we've left the room!", + pdu.event_id, pdu.room_id, origin, + ) + return + state = None auth_chain = [] @@ -591,9 +610,9 @@ class FederationHandler(BaseHandler): missing_auth - failed_to_fetch ) - results = yield preserve_context_over_deferred(defer.gatherResults( + results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.replication_layer.get_pdu)( + logcontext.preserve_fn(self.replication_layer.get_pdu)( [dest], event_id, outlier=True, @@ -785,10 +804,14 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) logger.debug("calling resolve_state_groups in _maybe_backfill") - states = yield preserve_context_over_deferred(defer.gatherResults([ - preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e]) - for e in event_ids - ])) + states = yield logcontext.make_deferred_yieldable(defer.gatherResults( + [ + logcontext.preserve_fn(self.state_handler.resolve_state_groups)( + room_id, [e] + ) + for e in event_ids + ], consumeErrors=True, + )) states = dict(zip(event_ids, [s.state for s in states])) state_map = yield self.store.get_events( @@ -941,9 +964,7 @@ class FederationHandler(BaseHandler): # lots of requests for missing prev_events which we do actually # have. Hence we fire off the deferred, but don't wait for it. - synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)( - room_queue - ) + logcontext.preserve_fn(self._handle_queued_pdus)(room_queue) defer.returnValue(True) @@ -1070,10 +1091,23 @@ class FederationHandler(BaseHandler): """ event = pdu + if event.state_key is None: + raise SynapseError(400, "The invite event did not have a state key") + is_blocked = yield self.store.is_room_blocked(event.room_id) if is_blocked: raise SynapseError(403, "This room has been blocked on this server") + if self.hs.config.block_non_admin_invites: + raise SynapseError(403, "This server does not accept room invites") + + if not self.spam_checker.user_may_invite( + event.sender, event.state_key, event.room_id, + ): + raise SynapseError( + 403, "This user is not permitted to send invites to this server/user" + ) + membership = event.content.get("membership") if event.type != EventTypes.Member or membership != Membership.INVITE: raise SynapseError(400, "The event was not an m.room.member invite event") @@ -1082,9 +1116,6 @@ class FederationHandler(BaseHandler): if sender_domain != origin: raise SynapseError(400, "The invite event was not from the server sending it") - if event.state_key is None: - raise SynapseError(400, "The invite event did not have a state key") - if not self.is_mine_id(event.state_key): raise SynapseError(400, "The invite event must be for this server") @@ -1413,7 +1444,7 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) - if not event.internal_metadata.is_outlier(): + if not event.internal_metadata.is_outlier() and not backfilled: yield self.action_generator.handle_push_actions_for_event( event, context ) @@ -1427,7 +1458,7 @@ class FederationHandler(BaseHandler): if not backfilled: # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - preserve_fn(self.pusher_pool.on_new_notifications)( + logcontext.preserve_fn(self.pusher_pool.on_new_notifications)( event_stream_id, max_stream_id ) @@ -1440,16 +1471,16 @@ class FederationHandler(BaseHandler): a bunch of outliers, but not a chunk of individual events that depend on each other for state calculations. """ - contexts = yield preserve_context_over_deferred(defer.gatherResults( + contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self._prep_event)( + logcontext.preserve_fn(self._prep_event)( origin, ev_info["event"], state=ev_info.get("state"), auth_events=ev_info.get("auth_events"), ) for ev_info in event_infos - ] + ], consumeErrors=True, )) yield self.store.persist_events( @@ -1606,7 +1637,7 @@ class FederationHandler(BaseHandler): context.rejected = RejectedReason.AUTH_ERROR - if event.type == EventTypes.GuestAccess: + if event.type == EventTypes.GuestAccess and not context.rejected: yield self.maybe_kick_guest_users(event) defer.returnValue(context) @@ -1757,18 +1788,17 @@ class FederationHandler(BaseHandler): # Do auth conflict res. logger.info("Different auth: %s", different_auth) - different_events = yield preserve_context_over_deferred(defer.gatherResults( - [ - preserve_fn(self.store.get_event)( + different_events = yield logcontext.make_deferred_yieldable( + defer.gatherResults([ + logcontext.preserve_fn(self.store.get_event)( d, allow_none=True, allow_rejected=False, ) for d in different_auth if d in have_events and not have_events[d] - ], - consumeErrors=True - )).addErrback(unwrapFirstError) + ], consumeErrors=True) + ).addErrback(unwrapFirstError) if different_events: local_view = dict(auth_events) @@ -2090,6 +2120,14 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function def on_exchange_third_party_invite_request(self, origin, room_id, event_dict): + """Handle an exchange_third_party_invite request from a remote server + + The remote server will call this when it wants to turn a 3pid invite + into a normal m.room.member invite. + + Returns: + Deferred: resolves (to None) + """ builder = self.event_builder_factory.new(event_dict) message_handler = self.hs.get_handlers().message_handler @@ -2108,9 +2146,12 @@ class FederationHandler(BaseHandler): raise e yield self._check_signature(event, context) + # XXX we send the invite here, but send_membership_event also sends it, + # so we end up making two requests. I think this is redundant. returned_invite = yield self.send_invite(origin, event) # TODO: Make sure the signatures actually are correct. event.signatures.update(returned_invite.signatures) + member_handler = self.hs.get_handlers().room_member_handler yield member_handler.send_membership_event(None, event, context) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index fe9d8848bc..eb7e052ba6 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from twisted.internet import defer from synapse.api.constants import EventTypes, Membership @@ -59,6 +58,8 @@ class MessageHandler(BaseHandler): self.action_generator = hs.get_action_generator() + self.spam_checker = hs.get_spam_checker() + @defer.inlineCallbacks def purge_history(self, room_id, event_id): event = yield self.store.get_event(event_id) @@ -322,6 +323,12 @@ class MessageHandler(BaseHandler): token_id=requester.access_token_id, txn_id=txn_id ) + + if self.spam_checker.check_event_for_spam(event): + raise SynapseError( + 403, "Spam is not permitted here", Codes.FORBIDDEN + ) + yield self.send_nonmember_event( requester, event, @@ -413,6 +420,51 @@ class MessageHandler(BaseHandler): [serialize_event(c, now) for c in room_state.values()] ) + @defer.inlineCallbacks + def get_joined_members(self, requester, room_id): + """Get all the joined members in the room and their profile information. + + If the user has left the room return the state events from when they left. + + Args: + requester(Requester): The user requesting state events. + room_id(str): The room ID to get all state events from. + Returns: + A dict of user_id to profile info + """ + user_id = requester.user.to_string() + if not requester.app_service: + # We check AS auth after fetching the room membership, as it + # requires us to pull out all joined members anyway. + membership, _ = yield self._check_in_room_or_world_readable( + room_id, user_id + ) + if membership != Membership.JOIN: + raise NotImplementedError( + "Getting joined members after leaving is not implemented" + ) + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + + # If this is an AS, double check that they are allowed to see the members. + # This can either be because the AS user is in the room or becuase there + # is a user in the room that the AS is "interested in" + if requester.app_service and user_id not in users_with_profile: + for uid in users_with_profile: + if requester.app_service.is_interested_in_user(uid): + break + else: + # Loop fell through, AS has no interested users in room + raise AuthError(403, "Appservice not in room") + + defer.returnValue({ + user_id: { + "avatar_url": profile.avatar_url, + "display_name": profile.display_name, + } + for user_id, profile in users_with_profile.iteritems() + }) + @measure_func("_create_new_client_event") @defer.inlineCallbacks def _create_new_client_event(self, builder, requester=None, prev_event_ids=None): diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5698d28088..535ba9517c 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -60,6 +60,11 @@ class RoomCreationHandler(BaseHandler): }, } + def __init__(self, hs): + super(RoomCreationHandler, self).__init__(hs) + + self.spam_checker = hs.get_spam_checker() + @defer.inlineCallbacks def create_room(self, requester, config, ratelimit=True): """ Creates a new room. @@ -75,6 +80,9 @@ class RoomCreationHandler(BaseHandler): """ user_id = requester.user.to_string() + if not self.spam_checker.user_may_create_room(user_id): + raise SynapseError(403, "You are not permitted to create rooms") + if ratelimit: yield self.ratelimit(requester) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index b3f979b246..36a8ef8ce0 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -48,6 +48,7 @@ class RoomMemberHandler(BaseHandler): self.member_linearizer = Linearizer(name="member") self.clock = hs.get_clock() + self.spam_checker = hs.get_spam_checker() self.distributor = hs.get_distributor() self.distributor.declare("user_joined_room") @@ -191,6 +192,8 @@ class RoomMemberHandler(BaseHandler): if action in ["kick", "unban"]: effective_membership_state = "leave" + # if this is a join with a 3pid signature, we may need to turn a 3pid + # invite into a normal invite before we can handle the join. if third_party_signed is not None: replication = self.hs.get_replication_layer() yield replication.exchange_third_party_invite( @@ -208,6 +211,30 @@ class RoomMemberHandler(BaseHandler): if is_blocked: raise SynapseError(403, "This room has been blocked on this server") + if effective_membership_state == "invite": + block_invite = False + is_requester_admin = yield self.auth.is_server_admin( + requester.user, + ) + if not is_requester_admin: + if self.hs.config.block_non_admin_invites: + logger.info( + "Blocking invite: user is not admin and non-admin " + "invites disabled" + ) + block_invite = True + + if not self.spam_checker.user_may_invite( + requester.user.to_string(), target.to_string(), room_id, + ): + logger.info("Blocking invite due to spam checker") + block_invite = True + + if block_invite: + raise SynapseError( + 403, "Invites have been disabled on this server", + ) + latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) current_state_ids = yield self.state_handler.get_current_state_ids( room_id, latest_event_ids=latest_event_ids, @@ -471,6 +498,16 @@ class RoomMemberHandler(BaseHandler): requester, txn_id ): + if self.hs.config.block_non_admin_invites: + is_requester_admin = yield self.auth.is_server_admin( + requester.user, + ) + if not is_requester_admin: + raise SynapseError( + 403, "Invites have been disabled on this server", + Codes.FORBIDDEN, + ) + invitee = yield self._lookup_3pid( id_server, medium, address ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 91c6c6be3c..dd0ec00ae6 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -108,6 +108,16 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ return True +class DeviceLists(collections.namedtuple("DeviceLists", [ + "changed", # list of user_ids whose devices may have changed + "left", # list of user_ids whose devices we no longer track +])): + __slots__ = [] + + def __nonzero__(self): + return bool(self.changed or self.left) + + class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync "presence", # List of presence events for the user. @@ -290,10 +300,20 @@ class SyncHandler(object): if recents: recents = sync_config.filter_collection.filter_room_timeline(recents) + + # We check if there are any state events, if there are then we pass + # all current state events to the filter_events function. This is to + # ensure that we always include current state in the timeline + current_state_ids = frozenset() + if any(e.is_state() for e in recents): + current_state_ids = yield self.state.get_current_state_ids(room_id) + current_state_ids = frozenset(current_state_ids.itervalues()) + recents = yield filter_events_for_client( self.store, sync_config.user.to_string(), recents, + always_include_ids=current_state_ids, ) else: recents = [] @@ -325,10 +345,20 @@ class SyncHandler(object): loaded_recents = sync_config.filter_collection.filter_room_timeline( events ) + + # We check if there are any state events, if there are then we pass + # all current state events to the filter_events function. This is to + # ensure that we always include current state in the timeline + current_state_ids = frozenset() + if any(e.is_state() for e in loaded_recents): + current_state_ids = yield self.state.get_current_state_ids(room_id) + current_state_ids = frozenset(current_state_ids.itervalues()) + loaded_recents = yield filter_events_for_client( self.store, sync_config.user.to_string(), loaded_recents, + always_include_ids=current_state_ids, ) loaded_recents.extend(recents) recents = loaded_recents @@ -535,7 +565,8 @@ class SyncHandler(object): res = yield self._generate_sync_entry_for_rooms( sync_result_builder, account_data_by_room ) - newly_joined_rooms, newly_joined_users = res + newly_joined_rooms, newly_joined_users, _, _ = res + _, _, newly_left_rooms, newly_left_users = res block_all_presence_data = ( since_token is None and @@ -549,7 +580,11 @@ class SyncHandler(object): yield self._generate_sync_entry_for_to_device(sync_result_builder) device_lists = yield self._generate_sync_entry_for_device_list( - sync_result_builder + sync_result_builder, + newly_joined_rooms=newly_joined_rooms, + newly_joined_users=newly_joined_users, + newly_left_rooms=newly_left_rooms, + newly_left_users=newly_left_users, ) device_id = sync_config.device_id @@ -574,25 +609,50 @@ class SyncHandler(object): @measure_func("_generate_sync_entry_for_device_list") @defer.inlineCallbacks - def _generate_sync_entry_for_device_list(self, sync_result_builder): + def _generate_sync_entry_for_device_list(self, sync_result_builder, + newly_joined_rooms, newly_joined_users, + newly_left_rooms, newly_left_users): user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token if since_token and since_token.device_list_key: - room_ids = yield self.store.get_rooms_for_user(user_id) - - user_ids_changed = set() changed = yield self.store.get_user_whose_devices_changed( since_token.device_list_key ) - for other_user_id in changed: - other_room_ids = yield self.store.get_rooms_for_user(other_user_id) - if room_ids.intersection(other_room_ids): - user_ids_changed.add(other_user_id) - defer.returnValue(user_ids_changed) + # TODO: Be more clever than this, i.e. remove users who we already + # share a room with? + for room_id in newly_joined_rooms: + joined_users = yield self.state.get_current_user_in_room(room_id) + newly_joined_users.update(joined_users) + + for room_id in newly_left_rooms: + left_users = yield self.state.get_current_user_in_room(room_id) + newly_left_users.update(left_users) + + # TODO: Check that these users are actually new, i.e. either they + # weren't in the previous sync *or* they left and rejoined. + changed.update(newly_joined_users) + + if not changed and not newly_left_users: + defer.returnValue(DeviceLists( + changed=[], + left=newly_left_users, + )) + + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) + + defer.returnValue(DeviceLists( + changed=users_who_share_room & changed, + left=set(newly_left_users) - users_who_share_room, + )) else: - defer.returnValue([]) + defer.returnValue(DeviceLists( + changed=[], + left=[], + )) @defer.inlineCallbacks def _generate_sync_entry_for_to_device(self, sync_result_builder): @@ -756,8 +816,8 @@ class SyncHandler(object): account_data_by_room(dict): Dictionary of per room account data Returns: - Deferred(tuple): Returns a 2-tuple of - `(newly_joined_rooms, newly_joined_users)` + Deferred(tuple): Returns a 4-tuple of + `(newly_joined_rooms, newly_joined_users, newly_left_rooms, newly_left_users)` """ user_id = sync_result_builder.sync_config.user.to_string() block_all_room_ephemeral = ( @@ -788,7 +848,7 @@ class SyncHandler(object): ) if not tags_by_room: logger.debug("no-oping sync") - defer.returnValue(([], [])) + defer.returnValue(([], [], [], [])) ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( "m.ignored_user_list", user_id=user_id, @@ -801,7 +861,7 @@ class SyncHandler(object): if since_token: res = yield self._get_rooms_changed(sync_result_builder, ignored_users) - room_entries, invited, newly_joined_rooms = res + room_entries, invited, newly_joined_rooms, newly_left_rooms = res tags_by_room = yield self.store.get_updated_tags( user_id, since_token.account_data_key, @@ -809,6 +869,7 @@ class SyncHandler(object): else: res = yield self._get_all_rooms(sync_result_builder, ignored_users) room_entries, invited, newly_joined_rooms = res + newly_left_rooms = [] tags_by_room = yield self.store.get_tags_for_user(user_id) @@ -829,17 +890,30 @@ class SyncHandler(object): # Now we want to get any newly joined users newly_joined_users = set() + newly_left_users = set() if since_token: for joined_sync in sync_result_builder.joined: it = itertools.chain( - joined_sync.timeline.events, joined_sync.state.values() + joined_sync.timeline.events, joined_sync.state.itervalues() ) for event in it: if event.type == EventTypes.Member: if event.membership == Membership.JOIN: newly_joined_users.add(event.state_key) - - defer.returnValue((newly_joined_rooms, newly_joined_users)) + else: + prev_content = event.unsigned.get("prev_content", {}) + prev_membership = prev_content.get("membership", None) + if prev_membership == Membership.JOIN: + newly_left_users.add(event.state_key) + + newly_left_users -= newly_joined_users + + defer.returnValue(( + newly_joined_rooms, + newly_joined_users, + newly_left_rooms, + newly_left_users, + )) @defer.inlineCallbacks def _have_rooms_changed(self, sync_result_builder): @@ -909,15 +983,28 @@ class SyncHandler(object): mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) newly_joined_rooms = [] + newly_left_rooms = [] room_entries = [] invited = [] - for room_id, events in mem_change_events_by_room_id.items(): + for room_id, events in mem_change_events_by_room_id.iteritems(): non_joins = [e for e in events if e.membership != Membership.JOIN] has_join = len(non_joins) != len(events) # We want to figure out if we joined the room at some point since # the last sync (even if we have since left). This is to make sure # we do send down the room, and with full state, where necessary + + old_state_ids = None + if room_id in joined_room_ids and non_joins: + # Always include if the user (re)joined the room, especially + # important so that device list changes are calculated correctly. + # If there are non join member events, but we are still in the room, + # then the user must have left and joined + newly_joined_rooms.append(room_id) + + # User is in the room so we don't need to do the invite/leave checks + continue + if room_id in joined_room_ids or has_join: old_state_ids = yield self.get_state_at(room_id, since_token) old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None) @@ -929,12 +1016,33 @@ class SyncHandler(object): if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: newly_joined_rooms.append(room_id) - if room_id in joined_room_ids: - continue + # If user is in the room then we don't need to do the invite/leave checks + if room_id in joined_room_ids: + continue if not non_joins: continue + # Check if we have left the room. This can either be because we were + # joined before *or* that we since joined and then left. + if events[-1].membership != Membership.JOIN: + if has_join: + newly_left_rooms.append(room_id) + else: + if not old_state_ids: + old_state_ids = yield self.get_state_at(room_id, since_token) + old_mem_ev_id = old_state_ids.get( + (EventTypes.Member, user_id), + None, + ) + old_mem_ev = None + if old_mem_ev_id: + old_mem_ev = yield self.store.get_event( + old_mem_ev_id, allow_none=True + ) + if old_mem_ev and old_mem_ev.membership == Membership.JOIN: + newly_left_rooms.append(room_id) + # Only bother if we're still currently invited should_invite = non_joins[-1].membership == Membership.INVITE if should_invite: @@ -1012,7 +1120,7 @@ class SyncHandler(object): upto_token=since_token, )) - defer.returnValue((room_entries, invited, newly_joined_rooms)) + defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms)) @defer.inlineCallbacks def _get_all_rooms(self, sync_result_builder, ignored_users): @@ -1260,6 +1368,7 @@ class SyncResultBuilder(object): self.invited = [] self.archived = [] self.device = [] + self.to_device = [] class RoomSyncResultBuilder(object): |